diff --git a/.github/workflows/publish-dts-sdk.yml b/.github/workflows/durabletask-azuremanaged.yml similarity index 87% rename from .github/workflows/publish-dts-sdk.yml rename to .github/workflows/durabletask-azuremanaged.yml index de773f2..73017e4 100644 --- a/.github/workflows/publish-dts-sdk.yml +++ b/.github/workflows/durabletask-azuremanaged.yml @@ -1,4 +1,4 @@ -name: Publish Durable Task Scheduler to PyPI +name: Durable Task Scheduler SDK (durabletask-azuremanaged) on: push: @@ -15,10 +15,10 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - - name: Set up Python 3.12 + - name: Set up Python 3.13 uses: actions/setup-python@v5 with: - python-version: 3.12 + python-version: 3.13 - name: Install dependencies working-directory: durabletask-azuremanaged run: | @@ -28,10 +28,17 @@ jobs: - name: Run flake8 Linter working-directory: durabletask-azuremanaged run: flake8 . + - name: Run flake8 Linter + working-directory: tests/durabletask-azuremanaged + run: flake8 . run-docker-tests: + strategy: + fail-fast: false + matrix: + python-version: ["3.9", "3.10", "3.11", "3.12", "3.13"] env: - EMULATOR_VERSION: "v0.0.5" # Define the variable + EMULATOR_VERSION: "latest" needs: lint runs-on: ubuntu-latest steps: @@ -84,7 +91,7 @@ jobs: - name: Set up Python uses: actions/setup-python@v5 with: - python-version: "3.12" # Adjust Python version as needed + python-version: "3.13" # Adjust Python version as needed - name: Install dependencies run: | diff --git a/.github/workflows/durabletask.yml b/.github/workflows/durabletask.yml new file mode 100644 index 0000000..4fb3fb0 --- /dev/null +++ b/.github/workflows/durabletask.yml @@ -0,0 +1,108 @@ +name: Durable Task SDK (durabletask) + +on: + push: + branches: + - "main" + tags: + - "v*" # Only run for tags starting with "v" + pull_request: + branches: + - "main" + +jobs: + lint-and-unit-tests: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Set up Python 3.13 + uses: actions/setup-python@v5 + with: + python-version: 3.13 + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install setuptools wheel tox + pip install flake8 + - name: Run flake8 Linter + working-directory: durabletask + run: flake8 . + - name: "Run flake8 linter: tests" + working-directory: tests/durabletask + run: flake8 . + - name: "Run flake8 linter: examples" + working-directory: examples + run: flake8 . + + run-tests: + strategy: + fail-fast: false + matrix: + python-version: ["3.9", "3.10", "3.11", "3.12", "3.13"] + needs: lint-and-unit-tests + runs-on: ubuntu-latest + steps: + - name: Checkout repository + uses: actions/checkout@v4 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + - name: Install durabletask dependencies and the library itself + run: | + python -m pip install --upgrade pip + pip install flake8 pytest + pip install -r requirements.txt + pip install . + - name: Pytest unit tests + working-directory: tests/durabletask + run: | + pytest -m "not e2e and not dts" --verbose + # Sidecar for running e2e tests requires Go SDK + - name: Install Go SDK + uses: actions/setup-go@v5 + with: + go-version: 'stable' + # Install and run the durabletask-go sidecar for running e2e tests + - name: Pytest e2e tests + working-directory: tests/durabletask + run: | + go install github.com/microsoft/durabletask-go@main + durabletask-go --port 4001 & + pytest -m "e2e and not dts" --verbose + + publish: + if: startsWith(github.ref, 'refs/tags/v') # Only run if a matching tag is pushed + needs: run-tests + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Extract version from tag + run: echo "VERSION=${GITHUB_REF#refs/tags/v}" >> $GITHUB_ENV # Extract version from the tag + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: "3.13" # Adjust Python version as needed + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install build twine + + - name: Build package from root directory + run: | + python -m build + + - name: Check package + run: | + twine check dist/* + + - name: Publish package to PyPI + env: + TWINE_USERNAME: __token__ + TWINE_PASSWORD: ${{ secrets.PYPI_API_TOKEN }} # Store your PyPI API token in GitHub Secrets + run: | + twine upload dist/* \ No newline at end of file diff --git a/.github/workflows/pr-validation.yml b/.github/workflows/pr-validation.yml deleted file mode 100644 index 1d14d83..0000000 --- a/.github/workflows/pr-validation.yml +++ /dev/null @@ -1,59 +0,0 @@ -# This workflow will install Python dependencies, run tests and lint with a variety of Python versions -# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-python - -name: Build Validation - -on: - push: - branches: [ "main" ] - pull_request: - branches: [ "main" ] - merge_group: - -jobs: - build: - - runs-on: ubuntu-latest - strategy: - fail-fast: false - matrix: - python-version: ["3.9", "3.10", "3.11", "3.12", "3.13"] - - steps: - - uses: actions/checkout@v4 - - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v5 - with: - python-version: ${{ matrix.python-version }} - - name: Install durabletask dependencies and the library itself in editable mode - run: | - python -m pip install --upgrade pip - pip install flake8 pytest - pip install -r requirements.txt - pip install -e . - - name: Install durabletask-azuremanaged dependencies - working-directory: examples/dts - run: | - python -m pip install --upgrade pip - pip install -r requirements.txt - - name: Lint with flake8 - run: | - flake8 . --count --show-source --statistics --exit-zero - - name: Pytest unit tests - working-directory: tests/durabletask - run: | - pytest -m "not e2e and not dts" --verbose - - # Sidecar for running e2e tests requires Go SDK - - name: Install Go SDK - uses: actions/setup-go@v5 - with: - go-version: 'stable' - - # Install and run the durabletask-go sidecar for running e2e tests - - name: Pytest e2e tests - working-directory: tests/durabletask - run: | - go install github.com/microsoft/durabletask-go@main - durabletask-go --port 4001 & - pytest -m "e2e and not dts" --verbose diff --git a/pyproject.toml b/pyproject.toml index 1491988..5438ca4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,7 +26,8 @@ license = {file = "LICENSE"} readme = "README.md" dependencies = [ "grpcio", - "protobuf" + "protobuf", + "asyncio" ] [project.urls] diff --git a/requirements.txt b/requirements.txt index 0da7d46..721453b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,5 +3,5 @@ grpcio>=1.60.0 # 1.60.0 is the version introducing protobuf 1.25.X support, newe protobuf pytest pytest-cov -azure-core -azure-identity \ No newline at end of file +azure-identity +asyncio \ No newline at end of file diff --git a/tests/durabletask-azuremanaged/test_dts_activity_sequence.py b/tests/durabletask-azuremanaged/test_dts_activity_sequence.py index c875e49..1a685d0 100644 --- a/tests/durabletask-azuremanaged/test_dts_activity_sequence.py +++ b/tests/durabletask-azuremanaged/test_dts_activity_sequence.py @@ -2,15 +2,15 @@ that calls an activity function in a sequence and prints the outputs.""" import os +import pytest + from durabletask import client, task from durabletask.azuremanaged.client import DurableTaskSchedulerClient from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker -import pytest - - pytestmark = pytest.mark.dts + def hello(ctx: task.ActivityContext, name: str) -> str: """Activity function that returns a greeting""" return f'Hello {name}!' diff --git a/tests/durabletask-azuremanaged/test_dts_orchestration_e2e.py b/tests/durabletask-azuremanaged/test_dts_orchestration_e2e.py index f10e605..9b7603f 100644 --- a/tests/durabletask-azuremanaged/test_dts_orchestration_e2e.py +++ b/tests/durabletask-azuremanaged/test_dts_orchestration_e2e.py @@ -2,9 +2,8 @@ # Licensed under the MIT License. import json -import threading -import time import os +import threading from datetime import timedelta import pytest @@ -21,6 +20,7 @@ taskhub_name = os.getenv("TASKHUB", "default") endpoint = os.getenv("ENDPOINT", "http://localhost:8080") + def test_empty_orchestration(): invoked = False @@ -31,12 +31,12 @@ def empty_orchestrator(ctx: task.OrchestrationContext, _): # Start a worker, which will connect to the sidecar in a background thread with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True, - taskhub=taskhub_name, token_credential=None) as w: + taskhub=taskhub_name, token_credential=None) as w: w.add_orchestrator(empty_orchestrator) w.start() c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True, - taskhub=taskhub_name, token_credential=None) + taskhub=taskhub_name, token_credential=None) id = c.schedule_new_orchestration(empty_orchestrator) state = c.wait_for_orchestration_completion(id, timeout=30) @@ -66,13 +66,13 @@ def sequence(ctx: task.OrchestrationContext, start_val: int): # Start a worker, which will connect to the sidecar in a background thread with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True, - taskhub=taskhub_name, token_credential=None) as w: + taskhub=taskhub_name, token_credential=None) as w: w.add_orchestrator(sequence) w.add_activity(plus_one) w.start() task_hub_client = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True, - taskhub=taskhub_name, token_credential=None) + taskhub=taskhub_name, token_credential=None) id = task_hub_client.schedule_new_orchestration(sequence, input=1) state = task_hub_client.wait_for_orchestration_completion( id, timeout=30) @@ -113,14 +113,14 @@ def orchestrator(ctx: task.OrchestrationContext, input: int): # Start a worker, which will connect to the sidecar in a background thread with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True, - taskhub=taskhub_name, token_credential=None) as w: + taskhub=taskhub_name, token_credential=None) as w: w.add_orchestrator(orchestrator) w.add_activity(throw) w.add_activity(increment_counter) w.start() task_hub_client = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True, - taskhub=taskhub_name, token_credential=None) + taskhub=taskhub_name, token_credential=None) id = task_hub_client.schedule_new_orchestration(orchestrator, input=1) state = task_hub_client.wait_for_orchestration_completion(id, timeout=30) @@ -158,14 +158,14 @@ def parent_orchestrator(ctx: task.OrchestrationContext, count: int): # Start a worker, which will connect to the sidecar in a background thread with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True, - taskhub=taskhub_name, token_credential=None) as w: + taskhub=taskhub_name, token_credential=None) as w: w.add_activity(increment) w.add_orchestrator(orchestrator_child) w.add_orchestrator(parent_orchestrator) w.start() task_hub_client = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True, - taskhub=taskhub_name, token_credential=None) + taskhub=taskhub_name, token_credential=None) id = task_hub_client.schedule_new_orchestration(parent_orchestrator, input=10) state = task_hub_client.wait_for_orchestration_completion(id, timeout=30) @@ -184,13 +184,13 @@ def orchestrator(ctx: task.OrchestrationContext, _): # Start a worker, which will connect to the sidecar in a background thread with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True, - taskhub=taskhub_name, token_credential=None) as w: + taskhub=taskhub_name, token_credential=None) as w: w.add_orchestrator(orchestrator) w.start() # Start the orchestration and immediately raise events to it. task_hub_client = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True, - taskhub=taskhub_name, token_credential=None) + taskhub=taskhub_name, token_credential=None) id = task_hub_client.schedule_new_orchestration(orchestrator) task_hub_client.raise_orchestration_event(id, 'A', data='a') task_hub_client.raise_orchestration_event(id, 'B', data='b') @@ -285,12 +285,12 @@ def orchestrator(ctx: task.OrchestrationContext, _): # Start a worker, which will connect to the sidecar in a background thread with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True, - taskhub=taskhub_name, token_credential=None) as w: + taskhub=taskhub_name, token_credential=None) as w: w.add_orchestrator(orchestrator) w.start() task_hub_client = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True, - taskhub=taskhub_name, token_credential=None) + taskhub=taskhub_name, token_credential=None) id = task_hub_client.schedule_new_orchestration(orchestrator) state = task_hub_client.wait_for_orchestration_start(id, timeout=30) assert state is not None @@ -302,23 +302,25 @@ def orchestrator(ctx: task.OrchestrationContext, _): assert state.runtime_status == client.OrchestrationStatus.TERMINATED assert state.serialized_output == json.dumps("some reason for termination") + def test_terminate_recursive(): def root(ctx: task.OrchestrationContext, _): result = yield ctx.call_sub_orchestrator(child) return result + def child(ctx: task.OrchestrationContext, _): result = yield ctx.wait_for_external_event("my_event") return result # Start a worker, which will connect to the sidecar in a background thread with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True, - taskhub=taskhub_name, token_credential=None) as w: + taskhub=taskhub_name, token_credential=None) as w: w.add_orchestrator(root) w.add_orchestrator(child) w.start() task_hub_client = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True, - taskhub=taskhub_name, token_credential=None) + taskhub=taskhub_name, token_credential=None) id = task_hub_client.schedule_new_orchestration(root) state = task_hub_client.wait_for_orchestration_start(id, timeout=30) assert state is not None @@ -331,7 +333,7 @@ def child(ctx: task.OrchestrationContext, _): assert state.runtime_status == client.OrchestrationStatus.TERMINATED # Verify that child orchestration is also terminated - c = task_hub_client.wait_for_orchestration_completion(id, timeout=30) + task_hub_client.wait_for_orchestration_completion(id, timeout=30) assert state is not None assert state.runtime_status == client.OrchestrationStatus.TERMINATED @@ -417,14 +419,14 @@ def throw_activity_with_retry(ctx: task.ActivityContext, _): raise RuntimeError("Kah-BOOOOM!!!") with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True, - taskhub=taskhub_name, token_credential=None) as w: + taskhub=taskhub_name, token_credential=None) as w: w.add_orchestrator(parent_orchestrator_with_retry) w.add_orchestrator(child_orchestrator_with_retry) w.add_activity(throw_activity_with_retry) w.start() task_hub_client = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True, - taskhub=taskhub_name, token_credential=None) + taskhub=taskhub_name, token_credential=None) id = task_hub_client.schedule_new_orchestration(parent_orchestrator_with_retry) state = task_hub_client.wait_for_orchestration_completion(id, timeout=30) assert state is not None @@ -460,13 +462,13 @@ def throw_activity(ctx: task.ActivityContext, _): raise RuntimeError("Kah-BOOOOM!!!") with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True, - taskhub=taskhub_name, token_credential=None) as w: + taskhub=taskhub_name, token_credential=None) as w: w.add_orchestrator(mock_orchestrator) w.add_activity(throw_activity) w.start() task_hub_client = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True, - taskhub=taskhub_name, token_credential=None) + taskhub=taskhub_name, token_credential=None) id = task_hub_client.schedule_new_orchestration(mock_orchestrator) state = task_hub_client.wait_for_orchestration_completion(id, timeout=30) assert state is not None @@ -477,6 +479,7 @@ def throw_activity(ctx: task.ActivityContext, _): assert state.failure_details.stack_trace is not None assert throw_activity_counter == 4 + def test_custom_status(): def empty_orchestrator(ctx: task.OrchestrationContext, _): @@ -484,12 +487,12 @@ def empty_orchestrator(ctx: task.OrchestrationContext, _): # Start a worker, which will connect to the sidecar in a background thread with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True, - taskhub=taskhub_name, token_credential=None) as w: + taskhub=taskhub_name, token_credential=None) as w: w.add_orchestrator(empty_orchestrator) w.start() c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True, - taskhub=taskhub_name, token_credential=None) + taskhub=taskhub_name, token_credential=None) id = c.schedule_new_orchestration(empty_orchestrator) state = c.wait_for_orchestration_completion(id, timeout=30) diff --git a/tests/durabletask-azuremanaged/test_durabletask_grpc_interceptor.py b/tests/durabletask-azuremanaged/test_durabletask_grpc_interceptor.py index 62978f9..0480d3d 100644 --- a/tests/durabletask-azuremanaged/test_durabletask_grpc_interceptor.py +++ b/tests/durabletask-azuremanaged/test_durabletask_grpc_interceptor.py @@ -1,7 +1,6 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT License. -import threading import unittest from concurrent import futures from importlib.metadata import version @@ -9,20 +8,17 @@ import grpc from durabletask.azuremanaged.client import DurableTaskSchedulerClient -from durabletask.azuremanaged.internal.durabletask_grpc_interceptor import ( - DTSDefaultClientInterceptorImpl, -) from durabletask.internal import orchestrator_service_pb2 as pb from durabletask.internal import orchestrator_service_pb2_grpc as stubs class MockTaskHubSidecarServiceServicer(stubs.TaskHubSidecarServiceServicer): """Mock implementation of the TaskHubSidecarService for testing.""" - + def __init__(self): self.captured_metadata = {} self.requests_received = 0 - + def GetInstance(self, request, context): """Implementation of GetInstance that captures the metadata.""" # Store all metadata key-value pairs from the context @@ -38,7 +34,7 @@ def GetInstance(self, request, context): class TestDurableTaskGrpcInterceptor(unittest.TestCase): """Tests for the DTSDefaultClientInterceptorImpl class.""" - + @classmethod def setUpClass(cls): # Start a real gRPC server on a free port @@ -52,11 +48,11 @@ def setUpClass(cls): # Start the server in a background thread cls.server.start() - + @classmethod def tearDownClass(cls): cls.server.stop(grace=None) - + def test_user_agent_metadata_passed_in_request(self): """Test that the user agent metadata is correctly passed in gRPC requests.""" # Create a client that connects to our mock server diff --git a/tests/durabletask/test_orchestration_e2e.py b/tests/durabletask/test_orchestration_e2e.py index d3d7f0b..3ccf782 100644 --- a/tests/durabletask/test_orchestration_e2e.py +++ b/tests/durabletask/test_orchestration_e2e.py @@ -278,10 +278,12 @@ def orchestrator(ctx: task.OrchestrationContext, _): assert state.runtime_status == client.OrchestrationStatus.TERMINATED assert state.serialized_output == json.dumps("some reason for termination") + def test_terminate_recursive(): def root(ctx: task.OrchestrationContext, _): result = yield ctx.call_sub_orchestrator(child) return result + def child(ctx: task.OrchestrationContext, _): result = yield ctx.wait_for_external_event("my_event") return result @@ -305,7 +307,7 @@ def child(ctx: task.OrchestrationContext, _): assert state.runtime_status == client.OrchestrationStatus.TERMINATED # Verify that child orchestration is also terminated - c = task_hub_client.wait_for_orchestration_completion(id, timeout=30) + task_hub_client.wait_for_orchestration_completion(id, timeout=30) assert state is not None assert state.runtime_status == client.OrchestrationStatus.TERMINATED @@ -321,7 +323,7 @@ def orchestrator(ctx: task.OrchestrationContext, input: int): result = yield ctx.wait_for_external_event("my_event") if not ctx.is_replaying: # NOTE: Real orchestrations should never interact with nonlocal variables like this. - nonlocal all_results + nonlocal all_results # noqa: F824 all_results.append(result) if len(all_results) <= 4: @@ -445,6 +447,7 @@ def throw_activity(ctx: task.ActivityContext, _): assert state.failure_details.stack_trace is not None assert throw_activity_counter == 4 + def test_custom_status(): def empty_orchestrator(ctx: task.OrchestrationContext, _):