diff --git a/.gitignore b/.gitignore index 6f97ca1..8a17c41 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ build *.pyc +.idea diff --git a/asyncdynamo/__init__.py b/asyncdynamo/__init__.py index 8a435f0..47914da 100644 --- a/asyncdynamo/__init__.py +++ b/asyncdynamo/__init__.py @@ -1,5 +1,5 @@ #!/bin/env python -# +# # Copyright 2010 bit.ly # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -25,7 +25,7 @@ raise ImportError("tornado library not installed. Install tornado. https://github.com/facebook/tornado") try: import boto - assert tuple(map(int,boto.Version.split('.'))) >= (2,3,0), "Boto >= 2.3.0 required." + assert tuple(map(int,boto.Version.split('.'))) >= (2,39,0), "Boto >= 2.39.0 required." except ImportError: raise ImportError("boto library not installed. Install boto. https://github.com/boto/boto") diff --git a/asyncdynamo/async_aws_sts.py b/asyncdynamo/async_aws_sts.py index 9f80feb..3282802 100644 --- a/asyncdynamo/async_aws_sts.py +++ b/asyncdynamo/async_aws_sts.py @@ -1,5 +1,5 @@ #!/bin/env python -# +# # Copyright 2012 bit.ly # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -38,12 +38,12 @@ class AsyncAwsSts(STSConnection): ''' Class that manages session tokens. Users of AsyncDynamoDB should not need to worry about what goes on here. - + Usage: Keep an instance of this class (though it should be cheap to re instantiate) and periodically call get_session_token to get a new Credentials object when, say, your session token expires ''' - + def __init__(self, aws_access_key_id=None, aws_secret_access_key=None, is_secure=True, port=None, proxy=None, proxy_port=None, proxy_user=None, proxy_pass=None, debug=0, @@ -55,7 +55,7 @@ def __init__(self, aws_access_key_id=None, aws_secret_access_key=None, proxy_user, proxy_pass, debug, https_connection_factory, region, path, converter) self.http_client = AsyncHTTPClient(io_loop=ioloop) - + def get_session_token(self, callback): ''' Gets a new Credentials object with a session token, using this @@ -63,16 +63,16 @@ def get_session_token(self, callback): or else a boto.exception.BotoServerError ''' return self.get_object('GetSessionToken', {}, Credentials, verb='POST', callback=callback) - + def get_object(self, action, params, cls, path="/", parent=None, verb="GET", callback=None): ''' Get an instance of `cls` using `action` ''' if not parent: parent = self - self.make_request(action, params, path, verb, + self.make_request(action, params, path, verb, functools.partial(self._finish_get_object, callback=callback, parent=parent, cls=cls)) - + def _finish_get_object(self, response_body, callback, cls=None, parent=None, error=None): ''' Process the body returned by STS. If an error is present, convert from a tornado error @@ -88,27 +88,29 @@ def _finish_get_object(self, response_body, callback, cls=None, parent=None, err h = boto.handler.XmlHandler(obj, parent) xml.sax.parseString(response_body, h) return callback(obj) - + def make_request(self, action, params={}, path='/', verb='GET', callback=None): ''' Make an async request. This handles the logic of translating from boto params to a tornado request obj, issuing the request, and passing back the body. - + The callback should operate on the body of the response, and take an optional error argument that will be a tornado error ''' - request = HTTPRequest('https://%s' % self.host, + request = HTTPRequest('https://%s' % self.host, method=verb) request.params = params request.auth_path = '/' # need this for auth request.host = self.host # need this for auth + request.port = 443 + request.protocol = self.protocol if action: request.params['Action'] = action if self.APIVersion: request.params['Version'] = self.APIVersion self._auth_handler.add_auth(request) # add signature self.http_client.fetch(request, functools.partial(self._finish_make_request, callback=callback)) - + def _finish_make_request(self, response, callback): if response.error: return callback(response.body, error=response.error) diff --git a/asyncdynamo/asyncdynamo.py b/asyncdynamo/asyncdynamo.py index b1364d6..f650a0f 100644 --- a/asyncdynamo/asyncdynamo.py +++ b/asyncdynamo/asyncdynamo.py @@ -1,5 +1,5 @@ #!/bin/env python -# +# # Copyright 2012 bit.ly # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -28,10 +28,11 @@ from collections import deque import time import logging +from urlparse import urlparse from boto.connection import AWSAuthConnection from boto.exception import DynamoDBResponseError -from boto.auth import HmacAuthV3HTTPHandler +from boto.auth import HmacAuthV4Handler from boto.provider import Provider from async_aws_sts import AsyncAwsSts, InvalidClientTokenIdError @@ -41,12 +42,12 @@ class AsyncDynamoDB(AWSAuthConnection): """ The main class for asynchronous connections to DynamoDB. - + The user should maintain one instance of this class (though more than one is ok), parametrized with the user's access key and secret key. Make calls with make_request or the helper methods, and AsyncDynamoDB will maintain session tokens in the background. - - + + As in Boto Layer1: "This is the lowest-level interface to DynamoDB. Methods at this layer map directly to API requests and parameters to the methods @@ -55,52 +56,72 @@ class AsyncDynamoDB(AWSAuthConnection): All responses are direct decoding of the JSON response bodies to Python data structures via the json or simplejson modules." """ - + DefaultHost = 'dynamodb.us-east-1.amazonaws.com' """The default DynamoDB API endpoint to connect to.""" - + ServiceName = 'DynamoDB' """The name of the Service""" - - Version = '20111205' + + Version = '20120810' """DynamoDB API version.""" - + ThruputError = "ProvisionedThroughputExceededException" """The error response returned when provisioned throughput is exceeded""" - + ExpiredSessionError = 'com.amazon.coral.service#ExpiredTokenException' """The error response returned when session token has expired""" - + UnrecognizedClientException = 'com.amazon.coral.service#UnrecognizedClientException' '''Another error response that is possible with a bad session token''' - + def __init__(self, aws_access_key_id=None, aws_secret_access_key=None, is_secure=True, port=None, proxy=None, proxy_port=None, - host=None, debug=0, session_token=None, + host=None, debug=0, session_token=None, endpoint=None, authenticate_requests=True, validate_cert=True, max_sts_attempts=3, ioloop=None): if not host: host = self.DefaultHost + if endpoint is not None: + self.url = endpoint + parse_url = urlparse(self.url) + self.host = parse_url.hostname + self.port = parse_url.port + self.protocol = parse_url.scheme + else: + self.protocol = 'https' if is_secure else 'http' + self.host = host + self.port = port + + url = '{0}://{1}'.format(self.protocol, self.host) + + if self.port: + url += ':{}'.format(self.port) + + self.url = url self.validate_cert = validate_cert - self.authenticate_requests = authenticate_requests - AWSAuthConnection.__init__(self, host, + self.authenticate_requests = authenticate_requests + AWSAuthConnection.__init__(self, self.host, aws_access_key_id, aws_secret_access_key, - is_secure, port, proxy, proxy_port, - debug=debug, security_token=session_token) + is_secure, self.port, proxy, proxy_port, + debug=debug, security_token=session_token, + validate_certs=self.validate_cert) self.ioloop = ioloop or IOLoop.instance() self.http_client = AsyncHTTPClient(io_loop=self.ioloop) self.pending_requests = deque() - self.sts = AsyncAwsSts(aws_access_key_id, aws_secret_access_key, ioloop=self.ioloop) + self.sts = AsyncAwsSts(aws_access_key_id, + aws_secret_access_key, + is_secure, self.port, proxy, proxy_port) assert (isinstance(max_sts_attempts, int) and max_sts_attempts >= 0) self.max_sts_attempts = max_sts_attempts - + def _init_session_token_cb(self, error=None): if error: logging.warn("Unable to get session token: %s" % error) - + def _required_auth_capability(self): - return ['hmac-v3-http'] - + return ['hmac-v4'] + def _update_session_token(self, callback, attempts=0, bypass_lock=False): ''' Begins the logic to get a new session token. Performs checks to ensure @@ -113,14 +134,14 @@ def _update_session_token(self, callback, attempts=0, bypass_lock=False): self.provider.security_token = PENDING_SESSION_TOKEN_UPDATE # invalidate the current security token return self.sts.get_session_token( functools.partial(self._update_session_token_cb, callback=callback, attempts=attempts)) - + def _update_session_token_cb(self, creds, provider='aws', callback=None, error=None, attempts=0): ''' Callback to use with `async_aws_sts`. The 'provider' arg is a bit misleading, it is a relic from boto and should probably be left to its default. This will take the new Credentials obj from `async_aws_sts.get_session_token()` and use it to update self.provider, and then will clear the deque of pending requests. - + A callback is optional. If provided, it must be callable without any arguments, but also accept an optional error argument that will be an instance of BotoServerError. ''' @@ -151,21 +172,21 @@ def raise_error(): creds.secret_key, creds.session_token) # force the correct auth, with the new provider - self._auth_handler = HmacAuthV3HTTPHandler(self.host, None, self.provider) + self._auth_handler = HmacAuthV4Handler(self.host, None, self.provider) while self.pending_requests: request = self.pending_requests.pop() request() if callable(callback): return callback() - + def make_request(self, action, body='', callback=None, object_hook=None): ''' Make an asynchronous HTTP request to DynamoDB. Callback should operate on the decoded json response (with object hook applied, of course). It should also accept an error argument, which will be a boto.exception.DynamoDBResponseError. - + If there is not a valid session token, this method will ensure that a new one is fetched - and cache the request when it is retrieved. + and cache the request when it is retrieved. ''' this_request = functools.partial(self.make_request, action=action, body=body, callback=callback,object_hook=object_hook) @@ -187,18 +208,22 @@ def cb_for_update(error=None): self.Version, action), 'Content-Type' : 'application/x-amz-json-1.0', 'Content-Length' : str(len(body))} - request = HTTPRequest('https://%s' % self.host, + request = HTTPRequest(self.url, method='POST', headers=headers, body=body, validate_cert=self.validate_cert) request.path = '/' # Important! set the path variable for signing by boto (<2.7). '/' is the path for all dynamodb requests request.auth_path = '/' # Important! set the auth_path variable for signing by boto(>2.7). '/' is the path for all dynamodb requests + request.params = {} + request.port = self.port + request.protocol = self.protocol + request.host = self.host if self.authenticate_requests: self._auth_handler.add_auth(request) # add signature to headers of the request self.http_client.fetch(request, functools.partial(self._finish_make_request, callback=callback, orig_request=this_request, token_used=self.provider.security_token, object_hook=object_hook)) # bam! - + def _finish_make_request(self, response, callback, orig_request, token_used, object_hook=None): ''' Check for errors and decode the json response (in the tornado response body), then pass on to orig callback. @@ -219,7 +244,7 @@ def _finish_make_request(self, response, callback, orig_request, token_used, obj return orig_request() # make_request will handle logic to get a new token if needed, and queue until it is fetched else: # because some errors are benign, include the response when an error is passed - return callback(json_response, error=DynamoDBResponseError(response.error.code, + return callback(json_response, error=DynamoDBResponseError(response.error.code, response.error.message, json_response)) if json_response is None: @@ -233,10 +258,10 @@ def get_item(self, table_name, key, callback, attributes_to_get=None, ''' Return a set of attributes for an item that matches the supplied key. - + The callback should operate on a dict representing the decoded response from DynamoDB (using the object_hook, if supplied) - + :type table_name: str :param table_name: The name of the table to delete. @@ -261,12 +286,12 @@ def get_item(self, table_name, key, callback, attributes_to_get=None, data['ConsistentRead'] = True return self.make_request('GetItem', body=json.dumps(data), callback=callback, object_hook=object_hook) - + def batch_get_item(self, request_items, callback): """ Return a set of attributes for a multiple items in multiple tables using their primary keys. - + The callback should operate on a dict representing the decoded response from DynamoDB (using the object_hook, if supplied) @@ -277,7 +302,7 @@ def batch_get_item(self, request_items, callback): data = {'RequestItems' : request_items} json_input = json.dumps(data) self.make_request('BatchGetItem', json_input, callback) - + def put_item(self, table_name, item, callback, expected=None, return_values=None, object_hook=None): ''' Create a new item or replace an old item with a new @@ -286,7 +311,7 @@ def put_item(self, table_name, item, callback, expected=None, return_values=None key, the new item will completely replace the old item. You can perform a conditional put by specifying an expected rule. - + The callback should operate on a dict representing the decoded response from DynamoDB (using the object_hook, if supplied) @@ -306,7 +331,7 @@ def put_item(self, table_name, item, callback, expected=None, return_values=None name-value pairs before then were changed. Possible values are: None or 'ALL_OLD'. If 'ALL_OLD' is specified and the item is overwritten, the content - of the old item is returned. + of the old item is returned. ''' data = {'TableName' : table_name, 'Item' : item} @@ -317,7 +342,7 @@ def put_item(self, table_name, item, callback, expected=None, return_values=None json_input = json.dumps(data) return self.make_request('PutItem', json_input, callback=callback, object_hook=object_hook) - + def query(self, table_name, hash_key_value, callback, range_key_conditions=None, attributes_to_get=None, limit=None, consistent_read=False, scan_index_forward=True, exclusive_start_key=None, @@ -326,7 +351,7 @@ def query(self, table_name, hash_key_value, callback, range_key_conditions=None, Perform a query of DynamoDB. This version is currently punting and expecting you to provide a full and correct JSON body which is passed as is to DynamoDB. - + The callback should operate on a dict representing the decoded response from DynamoDB (using the object_hook, if supplied) diff --git a/example/create_table.json b/example/create_table.json new file mode 100644 index 0000000..59c7dcb --- /dev/null +++ b/example/create_table.json @@ -0,0 +1,49 @@ +{ + "AttributeDefinitions": [ + { + "AttributeName": "ForumName", + "AttributeType": "S" + }, + { + "AttributeName": "Subject", + "AttributeType": "S" + }, + { + "AttributeName": "LastPostDateTime", + "AttributeType": "S" + } + ], + "TableName": "Thread", + "KeySchema": [ + { + "AttributeName": "ForumName", + "KeyType": "HASH" + }, + { + "AttributeName": "Subject", + "KeyType": "RANGE" + } + ], + "LocalSecondaryIndexes": [ + { + "IndexName": "LastPostIndex", + "KeySchema": [ + { + "AttributeName": "ForumName", + "KeyType": "HASH" + }, + { + "AttributeName": "LastPostDateTime", + "KeyType": "RANGE" + } + ], + "Projection": { + "ProjectionType": "KEYS_ONLY" + } + } + ], + "ProvisionedThroughput": { + "ReadCapacityUnits": 5, + "WriteCapacityUnits": 5 + } +} diff --git a/example/example.py b/example/example.py new file mode 100644 index 0000000..1294fb7 --- /dev/null +++ b/example/example.py @@ -0,0 +1,68 @@ +#!/usr/bin/env python +import tornado.ioloop +import tornado.web +from asyncdynamo import asyncdynamo +from tornado import gen +from tornado.web import asynchronous + + +DB = None + + +def db(): + global DB + if DB is not None: + return DB + + DB = asyncdynamo.AsyncDynamoDB(aws_access_key_id='your_aws_access_key_id', + aws_secret_access_key='your_aws_secret_access_key', + endpoint='http://localhost:8000') + return DB + + +@gen.coroutine +def create_table(callback): + json_open = open('create_table.json') + db().make_request(action='CreateTable', body=json_open.read(), callback=callback) + + +@gen.coroutine +def put_items(callback): + json_open = open('put_items.json') + db().make_request(action='PutItem', body=json_open.read(), callback=callback) + + +@gen.coroutine +def get_items(callback): + json_open = open('get_items.json') + db().make_request(action='GetItem', body=json_open.read(), callback=callback) + + +class MainHandler(tornado.web.RequestHandler): + @asynchronous + def post(self): + create_table(self.callback) + + def callback(self, res, error=None): + if error is not None: + self.finish({"error": error}) + self.finish({"data": res}) + + @asynchronous + def put(self): + put_items(self.callback) + + @asynchronous + def get(self): + get_items(self.callback) + + +def make_app(): + return tornado.web.Application([ + (r"/", MainHandler), + ]) + +if __name__ == "__main__": + app = make_app() + app.listen(8888) + tornado.ioloop.IOLoop.current().start() diff --git a/example/get_items.json b/example/get_items.json new file mode 100644 index 0000000..cee69d3 --- /dev/null +++ b/example/get_items.json @@ -0,0 +1,14 @@ +{ + "TableName": "Thread", + "Key": { + "ForumName": { + "S": "Amazon DynamoDB" + }, + "Subject": { + "S": "How do I update multiple items?" + } + }, + "ProjectionExpression":"LastPostDateTime, Message, Tags", + "ConsistentRead": true, + "ReturnConsumedCapacity": "TOTAL" +} diff --git a/example/put_items.json b/example/put_items.json new file mode 100644 index 0000000..87e7ebd --- /dev/null +++ b/example/put_items.json @@ -0,0 +1,28 @@ +{ + "TableName": "Thread", + "Item": { + "LastPostDateTime": { + "S": "201303190422" + }, + "Tags": { + "SS": ["Update","Multiple Items","HelpMe"] + }, + "ForumName": { + "S": "Amazon DynamoDB" + }, + "Message": { + "S": "I want to update multiple items in a single API call. What's the best way to do that?" + }, + "Subject": { + "S": "How do I update multiple items?" + }, + "LastPostedBy": { + "S": "fred@example.com" + } + }, + "ConditionExpression": "ForumName <> :f and Subject <> :s", + "ExpressionAttributeValues": { + ":f": {"S": "Amazon DynamoDB"}, + ":s": {"S": "How do I update multiple items?"} + } +}