Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
build/*
dist/*
docs_src/_build/
examples/config.yaml
poetry.lock
tamr_cloud_sdk.egg-info
__pycache__/
10 changes: 8 additions & 2 deletions docs_src/examples.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,14 @@

## Jobs

Fetch a list of jobs:
Fetch a list of jobs and check for statuses:

```{literalinclude} ../examples/list_jobs.py
```{literalinclude} ../examples/jobs/list_jobs.py
:language: python
```

Poll a job until complete:

```{literalinclude} ../examples/jobs/poll_job.py
:language: python
```
2 changes: 2 additions & 0 deletions examples/config.template.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
tamr_cloud_host: <your-tamr-cloud-host>
tamr_api_key: <your-tamr-cloud-api-key>
60 changes: 60 additions & 0 deletions examples/jobs/list_jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
"""Example script for fetching jobs from the Jobs API."""

import logging
import os
import sys

import yaml

from tamr.api.v1beta1.jobs_pb2 import JobState
from tamr_sdk.api_client import TamrApiClient

PAGE_SIZE = 25
NUM_PAGES_TO_CHECK = 10
STATUS_STRS = ["PENDING", "RUNNING"]
STATUSES = [JobState.Value(s) for s in STATUS_STRS]

if __name__ == "__main__":
# Set up logging
logger = logging.getLogger()
logger.setLevel("INFO")
logger.addHandler(logging.StreamHandler(sys.stdout))

# Read Tamr Cloud configurations from file
dir_path = os.path.dirname(os.path.realpath(__file__))
config_path = os.path.join(dir_path, "..", "config.yaml")
with open(config_path) as stream:
config = yaml.safe_load(stream)

# Initialize Tamr Cloud client
tamr_client = TamrApiClient(
config["tamr_cloud_host"],
[("x-api-key", config["tamr_api_key"])],
grpc_stack_trace=True,
)
jobs_client = tamr_client.jobs()
logger.info("Client initialization complete.")

# Find most recent job with status in `STATUSES`
next_page_token = None

for p in range(NUM_PAGES_TO_CHECK):
list_jobs_resp = jobs_client.list_jobs(
page_token=next_page_token, page_size=PAGE_SIZE
)

matches = [j.status.state in STATUSES for j in list_jobs_resp.jobs]

if any(matches):
first_matching_job = list_jobs_resp.jobs[matches.index(True)]
logger.info(
f"Most recent job with status in {STATUS_STRS} is {first_matching_job}."
)
break

next_page_token = list_jobs_resp.next_page_token
if p == NUM_PAGES_TO_CHECK - 1:
logger.info(
f"No job with status in {STATUS_STRS} found in recent "
+ f"{NUM_PAGES_TO_CHECK * PAGE_SIZE} jobs."
)
92 changes: 92 additions & 0 deletions examples/jobs/poll_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
"""Example script for polling a job from the Jobs API."""

import datetime
import logging
import os
import sys
import time

import yaml

from tamr.api.v1beta1.jobs_pb2 import Job, JobState
from tamr_sdk.api_client import TamrApiClient
from tamr_sdk.jobs.jobs_client import JobsClient

JOB_ID = "job_****************"


def calculate_runtime(job: Job) -> datetime.timedelta:
"""Calculates runtime for a job.

Assumes current status of job is "DONE". If job is not done, returns the
duration between start of job and beginning of job's current status.

Args:
job: Job object

Returns:
time delta object representing job duration
"""
stop = job.status.state_start_time
start = job.status_history[-1].state_start_time
diff = datetime.timedelta(
seconds=stop.seconds + stop.nanos * 1e-9 - start.seconds - start.nanos * 1e-9
)
return diff


def poll_job(
*,
jobs_client: JobsClient,
job_id: str,
logger: logging.Logger,
polling_interval_sec: int = 5,
) -> None:
"""Poll job and return runtime when finished.

Args:
jobs_client: Client instance associated with Tamr Cloud jobs service
job_id: job_id string (e.g. 'job_*********')
logger: logging instance
polling_interval_sec: how frequently to re-check job status
"""
while True:
# Check job status
job = jobs_client.get_job(job_id)
state = job.status.state
# Print info if complete
if state == JobState.DONE:
runtime = calculate_runtime(job)
logger.info(f"Job '{job_id}' finished in {runtime}.")
if job.status.error.message:
logger.warning(
f"Job '{job_id}' raised error: {job.status.error.message}."
)
break

# Wait before checking again
time.sleep(polling_interval_sec)


if __name__ == "__main__":
# Set up logging
logger = logging.getLogger()
logger.setLevel("INFO")
logger.addHandler(logging.StreamHandler(sys.stdout))

# Read Tamr Cloud configurations from file
dir_path = os.path.dirname(os.path.realpath(__file__))
config_path = os.path.join(dir_path, "..", "config.yaml")
with open(config_path) as stream:
config = yaml.safe_load(stream)

# Initialize Tamr Cloud jobs client
tamr_client = TamrApiClient(
config["tamr_cloud_host"],
[("x-api-key", config["tamr_api_key"])],
grpc_stack_trace=True,
)
jobs_client = tamr_client.jobs()
logger.info("Client initialization complete.")

poll_job(jobs_client=jobs_client, job_id=JOB_ID, logger=logger)
26 changes: 0 additions & 26 deletions examples/list_jobs.py

This file was deleted.

1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
google-api-python-client==2.126.0
grpcio==1.62.2
pyyaml
typing_extensions
1 change: 1 addition & 0 deletions requirements_dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ sphinx==7.3.7
sphinx-autodoc-typehints==2.1.0
sphinx-rtd-theme==2.0.0
types-protobuf
types-PyYAML
types-setuptools