From 1d4717d4e974954e776267951d1d2f24e8c08025 Mon Sep 17 00:00:00 2001 From: Josh Bird Date: Wed, 13 Jun 2018 17:06:03 -0400 Subject: [PATCH 1/6] Allow arbitrary host/port/encryption --- requirements.txt | 1 + waterbutler/core/utils.py | 18 ++-- waterbutler/providers/s3/metadata.py | 26 +++-- waterbutler/providers/s3/provider.py | 152 ++++++++++++++++----------- 4 files changed, 115 insertions(+), 82 deletions(-) diff --git a/requirements.txt b/requirements.txt index 63e9e8888..e844bc12b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,7 @@ agent==0.1.2 aiohttp==0.18.2 git+https://github.com/felliott/boto.git@feature/gen-url-query-params-6#egg=boto +boto3==1.7.36 celery==3.1.17 furl==0.4.2 google-auth==1.4.1 diff --git a/waterbutler/core/utils.py b/waterbutler/core/utils.py index 2e953418e..8be49cbab 100644 --- a/waterbutler/core/utils.py +++ b/waterbutler/core/utils.py @@ -105,15 +105,17 @@ async def send_signed_request(method, url, payload): )) -def normalize_datetime(date_string): - if date_string is None: +def normalize_datetime(date): + if date is None: return None - parsed_datetime = dateutil.parser.parse(date_string) - if not parsed_datetime.tzinfo: - parsed_datetime = parsed_datetime.replace(tzinfo=pytz.UTC) - parsed_datetime = parsed_datetime.astimezone(tz=pytz.UTC) - parsed_datetime = parsed_datetime.replace(microsecond=0) - return parsed_datetime.isoformat() + if isinstance(date, str): + date = dateutil.parser.parse(date) + if not date.tzinfo: + date = date.replace(tzinfo=pytz.UTC) + date = date.astimezone(tz=pytz.UTC) + date = date.replace(microsecond=0) + return date.isoformat() + class ZipStreamGenerator: diff --git a/waterbutler/providers/s3/metadata.py b/waterbutler/providers/s3/metadata.py index bcdbdea4a..a019833a4 100644 --- a/waterbutler/providers/s3/metadata.py +++ b/waterbutler/providers/s3/metadata.py @@ -1,6 +1,7 @@ import os from waterbutler.core import metadata +from waterbutler.core import utils class S3Metadata(metadata.BaseMetadata): @@ -18,9 +19,11 @@ class S3FileMetadataHeaders(S3Metadata, metadata.BaseFileMetadata): def __init__(self, path, headers): self._path = path + self.obj = headers + self._etag = None # Cast to dict to clone as the headers will # be destroyed when the request leaves scope - super().__init__(dict(headers)) + super().__init__(headers) @property def path(self): @@ -28,15 +31,15 @@ def path(self): @property def size(self): - return self.raw['CONTENT-LENGTH'] + return self.obj.content_length @property def content_type(self): - return self.raw['CONTENT-TYPE'] + return self.obj.content_type @property def modified(self): - return self.raw['LAST-MODIFIED'] + return utils.normalize_datetime(self.obj.last_modified) @property def created_utc(self): @@ -44,16 +47,19 @@ def created_utc(self): @property def etag(self): - return self.raw['ETAG'].replace('"', '') + if self._etag: + return self._etag + else: + self._etag = self.obj.e_tag.replace('"', '') + return self._etag @property def extra(self): - md5 = self.raw['ETAG'].replace('"', '') return { - 'md5': md5, - 'encryption': self.raw.get('X-AMZ-SERVER-SIDE-ENCRYPTION', ''), + 'md5': self.etag, + 'encryption': self.obj.server_side_encryption, 'hashes': { - 'md5': md5, + 'md5': self.etag, }, } @@ -70,7 +76,7 @@ def size(self): @property def modified(self): - return self.raw['LastModified'] + return self.raw['LastModified'].isoformat() @property def created_utc(self): diff --git a/waterbutler/providers/s3/provider.py b/waterbutler/providers/s3/provider.py index b09c7d07a..35d1acb55 100644 --- a/waterbutler/providers/s3/provider.py +++ b/waterbutler/providers/s3/provider.py @@ -14,6 +14,10 @@ from boto.s3.connection import S3Connection from boto.s3.connection import OrdinaryCallingFormat +import boto3 +from botocore.client import Config +from botocore.exceptions import ClientError + from waterbutler.core import streams from waterbutler.core import provider from waterbutler.core import exceptions @@ -45,7 +49,7 @@ class S3Provider(provider.BaseProvider): NAME = 's3' def __init__(self, auth, credentials, settings): - """ + """Initialize S3Provider .. note:: Neither `S3Connection#__init__` nor `S3Connection#get_bucket` @@ -57,10 +61,30 @@ def __init__(self, auth, credentials, settings): """ super().__init__(auth, credentials, settings) - self.connection = S3Connection(credentials['access_key'], - credentials['secret_key'], calling_format=OrdinaryCallingFormat()) + self.s3 = boto3.resource( + 's3', + endpoint_url='http://{}:{}'.format( + credentials['host'], + credentials['port'] + ), + aws_access_key_id=credentials['access_key'], + aws_secret_access_key=credentials['secret_key'], + config=Config(signature_version='s3v4'), + region_name='us-east-1' + ) + + self.connection = S3Connection( + credentials['access_key'], + credentials['secret_key'], + calling_format=OrdinaryCallingFormat(), + host=credentials['host'], + port=credentials['port'], + is_secure=False + ) + self.bucket_name = settings['bucket'] self.bucket = self.connection.get_bucket(settings['bucket'], validate=False) self.encrypt_uploads = self.settings.get('encrypt_uploads', False) + self.encrypt_uploads = False self.region = None async def validate_v1_path(self, path, **kwargs): @@ -347,27 +371,29 @@ async def revisions(self, path, **kwargs): :rtype list: """ await self._check_region() - query_params = {'prefix': path.path, 'delimiter': '/', 'versions': ''} url = functools.partial(self.bucket.generate_url, settings.TEMP_URL_SECS, 'GET', query_parameters=query_params) - resp = await self.make_request( - 'GET', - url, - params=query_params, - expects=(200, ), - throws=exceptions.MetadataError, - ) - content = await resp.read() - versions = xmltodict.parse(content)['ListVersionsResult'].get('Version') or [] + try: + resp = await self.make_request( + 'GET', + url, + params=query_params, + expects=(200, ), + throws=exceptions.MetadataError, + ) + content = await resp.read() + versions = xmltodict.parse(content)['ListVersionsResult'].get('Version') or [] - if isinstance(versions, dict): - versions = [versions] + if isinstance(versions, dict): + versions = [versions] - return [ - S3Revision(item) - for item in versions - if item['Key'] == path.path - ] + return [ + S3Revision(item) + for item in versions + if item['Key'] == path.path + ] + except: + return [] async def metadata(self, path, revision=None, **kwargs): """Get Metadata about the requested file or folder @@ -394,9 +420,10 @@ async def create_folder(self, path, folder_precheck=True, **kwargs): if (await self.exists(path)): raise exceptions.FolderNamingConflict(path.name) + url = functools.partial(self.bucket.new_key(path.path).generate_url, settings.TEMP_URL_SECS, 'PUT') async with self.request( 'PUT', - functools.partial(self.bucket.new_key(path.path).generate_url, settings.TEMP_URL_SECS, 'PUT'), + url, skip_auto_headers={'CONTENT-TYPE'}, expects=(200, 201), throws=exceptions.CreateFolderError @@ -406,52 +433,52 @@ async def create_folder(self, path, folder_precheck=True, **kwargs): async def _metadata_file(self, path, revision=None): await self._check_region() - if revision == 'Latest': - revision = None - resp = await self.make_request( - 'HEAD', - functools.partial( - self.bucket.new_key(path.path).generate_url, - settings.TEMP_URL_SECS, - 'HEAD', - query_parameters={'versionId': revision} if revision else None - ), - expects=(200, ), - throws=exceptions.MetadataError, - ) - await resp.release() - return S3FileMetadataHeaders(path.path, resp.headers) + if ( + revision == 'Latest' or + revision == '' or + not revision + ): + obj = self.s3.Object( + self.bucket.name, + path.path + ) + else: + obj = self.s3.ObjectVersion( + self.bucket.name, + path.path, + revision + ) + try: + obj.load() + except ClientError as err: + if err.response['Error']['Code'] == '404': + raise exceptions.NotFoundError(str(path)) + else: + raise err - async def _metadata_folder(self, path): - await self._check_region() + return S3FileMetadataHeaders(path.path, obj) - params = {'prefix': path.path, 'delimiter': '/'} - resp = await self.make_request( - 'GET', - functools.partial(self.bucket.generate_url, settings.TEMP_URL_SECS, 'GET', query_parameters=params), - params=params, - expects=(200, ), - throws=exceptions.MetadataError, + async def _metadata_folder(self, path): + """Get metadata about the contents of a bucket. This is either the + contents at the root of the bucket, or a folder has + been selected as a prefix by the user + """ + bucket = self.s3.Bucket(self.bucket_name) + result = bucket.meta.client.list_objects( + Bucket=bucket.name, + Prefix=path.path, + Delimiter='/' ) - - contents = await resp.read() - - parsed = xmltodict.parse(contents, strip_whitespace=False)['ListBucketResult'] - - contents = parsed.get('Contents', []) - prefixes = parsed.get('CommonPrefixes', []) + prefixes = result.get('CommonPrefixes', []) + contents = result.get('Contents', []) if not contents and not prefixes and not path.is_root: # If contents and prefixes are empty then this "folder" # must exist as a key with a / at the end of the name # if the path is root there is no need to test if it exists - resp = await self.make_request( - 'HEAD', - functools.partial(self.bucket.new_key(path.path).generate_url, settings.TEMP_URL_SECS, 'HEAD'), - expects=(200, ), - throws=exceptions.MetadataError, - ) - await resp.release() + + obj = self.s3.Object(self.bucket_name, path.path) + obj.load() if isinstance(contents, dict): contents = [contents] @@ -459,10 +486,7 @@ async def _metadata_folder(self, path): if isinstance(prefixes, dict): prefixes = [prefixes] - items = [ - S3FolderMetadata(item) - for item in prefixes - ] + items = [S3FolderMetadata(item) for item in prefixes] for content in contents: if content['Key'] == path.path: @@ -486,7 +510,7 @@ async def _check_region(self): Region Naming: http://docs.aws.amazon.com/general/latest/gr/rande.html#s3_region """ - if self.region is None: + if self.region is "Chewbacca": self.region = await self._get_bucket_region() if self.region == 'EU': self.region = 'eu-west-1' From cb9d166a2411fd47fa16b15cf4ba7c09b49180cd Mon Sep 17 00:00:00 2001 From: Josh Bird Date: Fri, 13 Jul 2018 12:33:01 -0400 Subject: [PATCH 2/6] Use boto3 for s3 operations boto2 causes lots of problems, and trying to use both is trying. Use boto3 for all s3 operations. --- waterbutler/providers/s3/provider.py | 103 +++++++++++++-------------- waterbutler/providers/s3/streams.py | 24 +++++++ 2 files changed, 72 insertions(+), 55 deletions(-) create mode 100644 waterbutler/providers/s3/streams.py diff --git a/waterbutler/providers/s3/provider.py b/waterbutler/providers/s3/provider.py index 35d1acb55..123d16ced 100644 --- a/waterbutler/providers/s3/provider.py +++ b/waterbutler/providers/s3/provider.py @@ -2,6 +2,7 @@ import hashlib import functools from urllib import parse +import logging import xmltodict @@ -24,12 +25,15 @@ from waterbutler.core.path import WaterButlerPath from waterbutler.providers.s3 import settings +from waterbutler.providers.s3.streams import S3ResponseBodyStream from waterbutler.providers.s3.metadata import S3Revision from waterbutler.providers.s3.metadata import S3FileMetadata from waterbutler.providers.s3.metadata import S3FolderMetadata from waterbutler.providers.s3.metadata import S3FolderKeyMetadata from waterbutler.providers.s3.metadata import S3FileMetadataHeaders +logger = logging.getLogger(__name__) + class S3Provider(provider.BaseProvider): """Provider for Amazon's S3 cloud storage service. @@ -63,14 +67,15 @@ def __init__(self, auth, credentials, settings): self.s3 = boto3.resource( 's3', - endpoint_url='http://{}:{}'.format( + endpoint_url='http{}://{}:{}'.format( + 's' if credentials['encrypted'] else '', credentials['host'], credentials['port'] ), aws_access_key_id=credentials['access_key'], aws_secret_access_key=credentials['secret_key'], - config=Config(signature_version='s3v4'), - region_name='us-east-1' + #config=Config(signature_version='s3v4'), + #region_name='us-east-1' ) self.connection = S3Connection( @@ -88,6 +93,7 @@ def __init__(self, auth, credentials, settings): self.region = None async def validate_v1_path(self, path, **kwargs): + logger.info("Validate") await self._check_region() if path == '/': @@ -96,26 +102,9 @@ async def validate_v1_path(self, path, **kwargs): implicit_folder = path.endswith('/') if implicit_folder: - params = {'prefix': path, 'delimiter': '/'} - resp = await self.make_request( - 'GET', - functools.partial(self.bucket.generate_url, settings.TEMP_URL_SECS, 'GET', query_parameters=params), - params=params, - expects=(200, 404), - throws=exceptions.MetadataError, - ) + self._metadata_folder(path) else: - resp = await self.make_request( - 'HEAD', - functools.partial(self.bucket.new_key(path).generate_url, settings.TEMP_URL_SECS, 'HEAD'), - expects=(200, 404), - throws=exceptions.MetadataError, - ) - - await resp.release() - - if resp.status == 404: - raise exceptions.NotFoundError(str(path)) + self._metadata_file(path) return WaterButlerPath(path) @@ -168,40 +157,34 @@ async def download(self, path, accept_url=False, revision=None, range=None, **kw :rtype: :class:`waterbutler.core.streams.ResponseStreamReader` :raises: :class:`waterbutler.core.exceptions.DownloadError` """ + logger.info("Download") await self._check_region() + get_kwargs = {} + if not path.is_file: raise exceptions.DownloadError('No file specified for download', code=400) - if not revision or revision.lower() == 'latest': - query_parameters = None - else: - query_parameters = {'versionId': revision} + if range: + get_kwargs['Range'] = 'bytes={}-{}'.format('', '') if kwargs.get('displayName'): - response_headers = {'response-content-disposition': 'attachment; filename*=UTF-8\'\'{}'.format(parse.quote(kwargs['displayName']))} + get_kwargs['ResponseContentDisposition'] = 'attachment; filename*=UTF-8\'\'{}'.format(parse.quote(kwargs['displayName'])) else: - response_headers = {'response-content-disposition': 'attachment'} + get_kwargs['ResponseContentDisposition'] = 'attachment' - url = functools.partial( - self.bucket.new_key(path.path).generate_url, - settings.TEMP_URL_SECS, - query_parameters=query_parameters, - response_headers=response_headers - ) + if revision: + get_kwargs['VersionId'] = revision - if accept_url: - return url() - - resp = await self.make_request( - 'GET', - url, - range=range, - expects=(200, 206), - throws=exceptions.DownloadError, - ) + try: + res = self.s3.Object( + self.bucket_name, + path.path + ).get(**get_kwargs) + except: + raise exceptions.DownloadError() - return streams.ResponseStreamReader(resp) + return S3ResponseBodyStream(res) async def upload(self, stream, path, conflict='replace', **kwargs): """Uploads the given stream to S3 @@ -211,6 +194,7 @@ async def upload(self, stream, path, conflict='replace', **kwargs): :rtype: dict, bool """ + logger.info("Upload") await self._check_region() path, exists = await self.handle_name_conflict(path, conflict=conflict) @@ -251,6 +235,7 @@ async def delete(self, path, confirm_delete=0, **kwargs): :param str path: The path of the key to delete :param int confirm_delete: Must be 1 to confirm root folder delete """ + logger.info("Delete") await self._check_region() if path.is_root: @@ -288,6 +273,7 @@ async def _delete_folder(self, path, **kwargs): To fully delete an occupied folder, we must delete all of the comprising objects. Amazon provides a bulk delete operation to simplify this. """ + logger.info('delete_folder') await self._check_region() more_to_come = True @@ -373,6 +359,7 @@ async def revisions(self, path, **kwargs): await self._check_region() query_params = {'prefix': path.path, 'delimiter': '/', 'versions': ''} url = functools.partial(self.bucket.generate_url, settings.TEMP_URL_SECS, 'GET', query_parameters=query_params) + logger.info("revisions") try: resp = await self.make_request( 'GET', @@ -401,12 +388,13 @@ async def metadata(self, path, revision=None, **kwargs): :param WaterButlerPath path: The path to a key or folder :rtype: dict or list """ + logger.info("metadata") await self._check_region() if path.is_dir: - return (await self._metadata_folder(path)) + return (await self._metadata_folder(path.path)) - return (await self._metadata_file(path, revision=revision)) + return (await self._metadata_file(path.path, revision=revision)) async def create_folder(self, path, folder_precheck=True, **kwargs): """ @@ -432,6 +420,7 @@ async def create_folder(self, path, folder_precheck=True, **kwargs): async def _metadata_file(self, path, revision=None): await self._check_region() + logger.info("metadata_file") if ( revision == 'Latest' or @@ -440,44 +429,47 @@ async def _metadata_file(self, path, revision=None): ): obj = self.s3.Object( self.bucket.name, - path.path + path ) else: obj = self.s3.ObjectVersion( self.bucket.name, - path.path, + path, revision ) try: obj.load() except ClientError as err: if err.response['Error']['Code'] == '404': - raise exceptions.NotFoundError(str(path)) + raise exceptions.NotFoundError(path) else: raise err - return S3FileMetadataHeaders(path.path, obj) + return S3FileMetadataHeaders(path, obj) async def _metadata_folder(self, path): """Get metadata about the contents of a bucket. This is either the contents at the root of the bucket, or a folder has been selected as a prefix by the user """ + logger.info("metadata_folder") bucket = self.s3.Bucket(self.bucket_name) result = bucket.meta.client.list_objects( Bucket=bucket.name, - Prefix=path.path, + Prefix=path, Delimiter='/' ) prefixes = result.get('CommonPrefixes', []) contents = result.get('Contents', []) - if not contents and not prefixes and not path.is_root: + if not contents and not prefixes and not path == "": + import pdb + pdb.set_trace() # If contents and prefixes are empty then this "folder" # must exist as a key with a / at the end of the name # if the path is root there is no need to test if it exists - obj = self.s3.Object(self.bucket_name, path.path) + obj = self.s3.Object(self.bucket_name, path) obj.load() if isinstance(contents, dict): @@ -489,7 +481,7 @@ async def _metadata_folder(self, path): items = [S3FolderMetadata(item) for item in prefixes] for content in contents: - if content['Key'] == path.path: + if content['Key'] == path: continue if content['Key'].endswith('/'): @@ -510,6 +502,7 @@ async def _check_region(self): Region Naming: http://docs.aws.amazon.com/general/latest/gr/rande.html#s3_region """ + logger.info("check_region") if self.region is "Chewbacca": self.region = await self._get_bucket_region() if self.region == 'EU': diff --git a/waterbutler/providers/s3/streams.py b/waterbutler/providers/s3/streams.py new file mode 100644 index 000000000..2430f256c --- /dev/null +++ b/waterbutler/providers/s3/streams.py @@ -0,0 +1,24 @@ +import asyncio + +from botocore.response import StreamingBody + +from waterbutler.core.streams.base import BaseStream + + +class S3ResponseBodyStream(BaseStream): + def __init__(self, data): + super().__init__() + + if not isinstance(data['Body'], StreamingBody): + raise TypeError('Data must be a StreamingBody, found {!r}'.format(type(data['body']))) + + self.content_type = data['ContentType'] + self._size = data['ContentLength'] + self.streaming_body = data['Body'] + + @property + def size(self): + return self._size + + async def _read(self, n=-1): + return self.streaming_body.read(amt=n) From 142464547b528bc31297bd2e042227d035e9e3f4 Mon Sep 17 00:00:00 2001 From: Josh Bird Date: Tue, 17 Jul 2018 17:45:28 -0400 Subject: [PATCH 3/6] Use Boto3 Upgrades boto2 to boto3. This required some implementations of operations to change because of some inflexibility inside of boto - It likes to do it the boto way, but using some of the internals, to generate signed urls for example, is difficult to call in isolation. In the interest of getting it implemented sooner than later, we use the boto way of doing some things rather than making our own requests manually. --- waterbutler/core/streams/http.py | 5 + waterbutler/providers/s3/provider.py | 268 ++++++++++----------------- waterbutler/providers/s3/streams.py | 9 +- 3 files changed, 109 insertions(+), 173 deletions(-) diff --git a/waterbutler/core/streams/http.py b/waterbutler/core/streams/http.py index fb2955344..957e9525c 100644 --- a/waterbutler/core/streams/http.py +++ b/waterbutler/core/streams/http.py @@ -179,6 +179,7 @@ def __init__(self, request, inner): super().__init__() self.inner = inner self.request = request + self.offset = 0 @property def size(self): @@ -187,12 +188,16 @@ def size(self): def at_eof(self): return self.inner.at_eof() + def tell(self): + return self.offset + async def _read(self, size): if self.inner.at_eof(): return b'' if size < 0: return (await self.inner.read(size)) try: + self.offset += size return (await self.inner.readexactly(size)) except asyncio.IncompleteReadError as e: return e.partial diff --git a/waterbutler/providers/s3/provider.py b/waterbutler/providers/s3/provider.py index 123d16ced..d1ca26efa 100644 --- a/waterbutler/providers/s3/provider.py +++ b/waterbutler/providers/s3/provider.py @@ -1,4 +1,5 @@ import os +import itertools import hashlib import functools from urllib import parse @@ -74,8 +75,8 @@ def __init__(self, auth, credentials, settings): ), aws_access_key_id=credentials['access_key'], aws_secret_access_key=credentials['secret_key'], - #config=Config(signature_version='s3v4'), - #region_name='us-east-1' + # config=Config(signature_version='s3v4'), + # region_name='us-east-1' ) self.connection = S3Connection( @@ -94,19 +95,20 @@ def __init__(self, auth, credentials, settings): async def validate_v1_path(self, path, **kwargs): logger.info("Validate") - await self._check_region() + + wb_path = WaterButlerPath(path) if path == '/': - return WaterButlerPath(path) + return wb_path implicit_folder = path.endswith('/') if implicit_folder: - self._metadata_folder(path) + await self._metadata_folder(wb_path.path) else: - self._metadata_file(path) + await self._metadata_file(wb_path.path) - return WaterButlerPath(path) + return wb_path async def validate_path(self, path, **kwargs): return WaterButlerPath(path) @@ -124,7 +126,6 @@ async def intra_copy(self, dest_provider, source_path, dest_path): """Copy key from one S3 bucket to another. The credentials specified in `dest_provider` must have read access to `source.bucket`. """ - await self._check_region() exists = await dest_provider.exists(dest_path) dest_key = dest_provider.bucket.new_key(dest_path.path) @@ -158,7 +159,6 @@ async def download(self, path, accept_url=False, revision=None, range=None, **kw :raises: :class:`waterbutler.core.exceptions.DownloadError` """ logger.info("Download") - await self._check_region() get_kwargs = {} @@ -195,7 +195,6 @@ async def upload(self, stream, path, conflict='replace', **kwargs): :rtype: dict, bool """ logger.info("Upload") - await self._check_region() path, exists = await self.handle_name_conflict(path, conflict=conflict) stream.add_writer('md5', streams.HashStreamWriter(hashlib.md5)) @@ -207,26 +206,41 @@ async def upload(self, stream, path, conflict='replace', **kwargs): if self.encrypt_uploads: headers['x-amz-server-side-encryption'] = 'AES256' - upload_url = functools.partial( - self.bucket.new_key(path.path).generate_url, - settings.TEMP_URL_SECS, - 'PUT', - headers=headers, + resp = self.s3.Object( + self.bucket_name, + path.path + ).put( + Body=(await stream.read()) # NBeeds to calculate hash inside boto - some issue with not implementing buffer api. + ) + + """ + bucket = self.s3.Bucket(self.bucket_name) + sign_url = lambda: bucket.meta.client.generate_presigned_url( + ClientMethod='put_object', + Params={ + 'Bucket': self.bucket_name, + 'Key': path.path, + 'ContentLength': str(stream.size) + }, + ExpiresIn=settings.TEMP_URL_SECS, + HttpMethod='PUT', ) + resp = await self.make_request( 'PUT', - upload_url, + sign_url, data=stream, skip_auto_headers={'CONTENT-TYPE'}, headers=headers, expects=(200, 201, ), throws=exceptions.UploadError, ) + """ + # md5 is returned as ETag header as long as server side encryption is not used. - if stream.writers['md5'].hexdigest != resp.headers['ETag'].replace('"', ''): + if stream.writers['md5'].hexdigest != resp['ETag'].replace('"', ''): raise exceptions.UploadChecksumMismatchError() - await resp.release() return (await self.metadata(path, **kwargs)), not exists async def delete(self, path, confirm_delete=0, **kwargs): @@ -235,9 +249,6 @@ async def delete(self, path, confirm_delete=0, **kwargs): :param str path: The path of the key to delete :param int confirm_delete: Must be 1 to confirm root folder delete """ - logger.info("Delete") - await self._check_region() - if path.is_root: if not confirm_delete == 1: raise exceptions.DeleteError( @@ -246,23 +257,40 @@ async def delete(self, path, confirm_delete=0, **kwargs): ) if path.is_file: - resp = await self.make_request( - 'DELETE', - self.bucket.new_key(path.path).generate_url(settings.TEMP_URL_SECS, 'DELETE'), - expects=(200, 204, ), - throws=exceptions.DeleteError, - ) - await resp.release() + await self._delete_file(path, **kwargs) else: await self._delete_folder(path, **kwargs) + async def _delete_file(self, path, **kwargs): + """Deletes a single object located at a certain key. + + Called from: func: delete if path.is_file + + """ + bucket = self.s3.Bucket(self.bucket_name) + sign_url = lambda: bucket.meta.client.generate_presigned_url( + 'delete_object', + Params={ + 'Bucket': self.bucket_name, + 'Key': path.path + }, + ExpiresIn=settings.TEMP_URL_SECS, + HttpMethod='DELETE', + ) + resp = await self.make_request( + 'DELETE', + sign_url, + expects={200, 204}, + throws=exceptions.DeleteError, + ) + await resp.release() + async def _delete_folder(self, path, **kwargs): """Query for recursive contents of folder and delete in batches of 1000 Called from: func: delete if not path.is_file - Calls: func: self._check_region - func: self.make_request + Calls: func: self.make_request func: self.bucket.generate_url :param *ProviderPath path: Path to be deleted @@ -273,82 +301,16 @@ async def _delete_folder(self, path, **kwargs): To fully delete an occupied folder, we must delete all of the comprising objects. Amazon provides a bulk delete operation to simplify this. """ - logger.info('delete_folder') - await self._check_region() - - more_to_come = True - content_keys = [] - query_params = {'prefix': path.path} - marker = None - - while more_to_come: - if marker is not None: - query_params['marker'] = marker - - resp = await self.make_request( - 'GET', - self.bucket.generate_url(settings.TEMP_URL_SECS, 'GET', query_parameters=query_params), - params=query_params, - expects=(200, ), - throws=exceptions.MetadataError, - ) - - contents = await resp.read() - parsed = xmltodict.parse(contents, strip_whitespace=False)['ListBucketResult'] - more_to_come = parsed.get('IsTruncated') == 'true' - contents = parsed.get('Contents', []) - - if isinstance(contents, dict): - contents = [contents] - - content_keys.extend([content['Key'] for content in contents]) - if len(content_keys) > 0: - marker = content_keys[-1] - - # Query against non-existant folder does not return 404 - if len(content_keys) == 0: - raise exceptions.NotFoundError(str(path)) - - while len(content_keys) > 0: - key_batch = content_keys[:1000] - del content_keys[:1000] - - payload = '' - payload += '' - payload += ''.join(map( - lambda x: '{}'.format(xml.sax.saxutils.escape(x)), - key_batch - )) - payload += '' - payload = payload.encode('utf-8') - md5 = compute_md5(BytesIO(payload)) - - query_params = {'delete': ''} - headers = { - 'Content-Length': str(len(payload)), - 'Content-MD5': md5[1], - 'Content-Type': 'text/xml', - } - - # We depend on a customized version of boto that can make query parameters part of - # the signature. - url = functools.partial( - self.bucket.generate_url, - settings.TEMP_URL_SECS, - 'POST', - query_parameters=query_params, - headers=headers, - ) - resp = await self.make_request( - 'POST', - url, - params=query_params, - data=payload, - headers=headers, - expects=(200, 204, ), - throws=exceptions.DeleteError, + for page in self.s3.meta.client.get_paginator('list_objects').paginate( + Bucket=self.bucket_name, + Prefix=path.path + ): + self.s3.meta.client.delete_objects( + Bucket=self.bucket_name, + Delete={ + 'Objects': [{'Key': item['Key']} for item in page['Contents']] + } ) - await resp.release() async def revisions(self, path, **kwargs): """Get past versions of the requested key @@ -356,7 +318,6 @@ async def revisions(self, path, **kwargs): :param str path: The path to a key :rtype list: """ - await self._check_region() query_params = {'prefix': path.path, 'delimiter': '/', 'versions': ''} url = functools.partial(self.bucket.generate_url, settings.TEMP_URL_SECS, 'GET', query_parameters=query_params) logger.info("revisions") @@ -389,10 +350,10 @@ async def metadata(self, path, revision=None, **kwargs): :rtype: dict or list """ logger.info("metadata") - await self._check_region() if path.is_dir: - return (await self._metadata_folder(path.path)) + return (await self._metadata_folder(path.path)) + # # store a hash of these args and the result in redis? return (await self._metadata_file(path.path, revision=revision)) @@ -400,26 +361,27 @@ async def create_folder(self, path, folder_precheck=True, **kwargs): """ :param str path: The path to create a folder at """ - await self._check_region() - + logger.info("Create_folder") WaterButlerPath.validate_folder(path) if folder_precheck: + # We should have already validated the path at this point - we + # should store the value so when we're here we dont make an extra + # request. + # If the folder exists, why do we need to throw? the user has asked + # us to make sure a folder with a certain name exists in the + # provider, if it already exsists, can we return Not Modified? if (await self.exists(path)): raise exceptions.FolderNamingConflict(path.name) - url = functools.partial(self.bucket.new_key(path.path).generate_url, settings.TEMP_URL_SECS, 'PUT') - async with self.request( - 'PUT', - url, - skip_auto_headers={'CONTENT-TYPE'}, - expects=(200, 201), - throws=exceptions.CreateFolderError - ): - return S3FolderMetadata({'Prefix': path.path}) + self.s3.Object( + self.bucket_name, + path.path + ).put() + # throws=exceptions.CreateFolderError + return S3FolderMetadata({'Prefix': path.path}) async def _metadata_file(self, path, revision=None): - await self._check_region() logger.info("metadata_file") if ( @@ -454,24 +416,29 @@ async def _metadata_folder(self, path): """ logger.info("metadata_folder") bucket = self.s3.Bucket(self.bucket_name) - result = bucket.meta.client.list_objects( - Bucket=bucket.name, - Prefix=path, - Delimiter='/' - ) + logger.info("LIST_OBJECTS") + result = bucket.meta.client.list_objects(Bucket=bucket.name, Prefix=path, Delimiter='/') prefixes = result.get('CommonPrefixes', []) contents = result.get('Contents', []) - + logger.info(bucket) + logger.info(result) + logger.info(prefixes) + logger.info(contents) if not contents and not prefixes and not path == "": - import pdb - pdb.set_trace() # If contents and prefixes are empty then this "folder" # must exist as a key with a / at the end of the name # if the path is root there is no need to test if it exists obj = self.s3.Object(self.bucket_name, path) - obj.load() - + try: + logger.info("Trying to load object") + logger.info(path) + obj.load() + except ClientError as err: + if err.response['Error']['Code'] == '404': + raise exceptions.NotFoundError(path) + else: + raise err if isinstance(contents, dict): contents = [contents] @@ -490,44 +457,3 @@ async def _metadata_folder(self, path): items.append(S3FileMetadata(content)) return items - - async def _check_region(self): - """Lookup the region via bucket name, then update the host to match. - - Manually constructing the connection hostname allows us to use OrdinaryCallingFormat - instead of SubdomainCallingFormat, which can break on buckets with periods in their name. - The default region, US East (N. Virginia), is represented by the empty string and does not - require changing the host. Ireland is represented by the string 'EU', with the host - parameter 'eu-west-1'. All other regions return the host parameter as the region name. - - Region Naming: http://docs.aws.amazon.com/general/latest/gr/rande.html#s3_region - """ - logger.info("check_region") - if self.region is "Chewbacca": - self.region = await self._get_bucket_region() - if self.region == 'EU': - self.region = 'eu-west-1' - - if self.region != '': - self.connection.host = self.connection.host.replace('s3.', 's3-' + self.region + '.', 1) - self.connection._auth_handler = get_auth_handler( - self.connection.host, boto_config, self.connection.provider, self.connection._required_auth_capability()) - - self.metrics.add('region', self.region) - - async def _get_bucket_region(self): - """Bucket names are unique across all regions. - - Endpoint doc: - http://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketGETlocation.html - """ - resp = await self.make_request( - 'GET', - functools.partial(self.bucket.generate_url, settings.TEMP_URL_SECS, 'GET', query_parameters={'location': ''}), - expects=(200, ), - throws=exceptions.MetadataError, - ) - - contents = await resp.read() - parsed = xmltodict.parse(contents, strip_whitespace=False) - return parsed['LocationConstraint'].get('#text', '') diff --git a/waterbutler/providers/s3/streams.py b/waterbutler/providers/s3/streams.py index 2430f256c..5d403d4bd 100644 --- a/waterbutler/providers/s3/streams.py +++ b/waterbutler/providers/s3/streams.py @@ -20,5 +20,10 @@ def __init__(self, data): def size(self): return self._size - async def _read(self, n=-1): - return self.streaming_body.read(amt=n) + async def _read(self, n=None): + n = self._size if n is None else n + + chunk = self.streaming_body.read(amt=n) + if not chunk: + self.feed_eof() + return chunk From d180d0d7b6f5e0eb503d1e579e0f3ca678b01458 Mon Sep 17 00:00:00 2001 From: Josh Bird Date: Tue, 17 Jul 2018 17:49:55 -0400 Subject: [PATCH 4/6] Remove dead request Removes some code that was written during trying to get the provider working. This code was exploratory and does not work. --- waterbutler/providers/s3/provider.py | 24 ------------------------ 1 file changed, 24 deletions(-) diff --git a/waterbutler/providers/s3/provider.py b/waterbutler/providers/s3/provider.py index d1ca26efa..ba6ccec28 100644 --- a/waterbutler/providers/s3/provider.py +++ b/waterbutler/providers/s3/provider.py @@ -213,30 +213,6 @@ async def upload(self, stream, path, conflict='replace', **kwargs): Body=(await stream.read()) # NBeeds to calculate hash inside boto - some issue with not implementing buffer api. ) - """ - bucket = self.s3.Bucket(self.bucket_name) - sign_url = lambda: bucket.meta.client.generate_presigned_url( - ClientMethod='put_object', - Params={ - 'Bucket': self.bucket_name, - 'Key': path.path, - 'ContentLength': str(stream.size) - }, - ExpiresIn=settings.TEMP_URL_SECS, - HttpMethod='PUT', - ) - - resp = await self.make_request( - 'PUT', - sign_url, - data=stream, - skip_auto_headers={'CONTENT-TYPE'}, - headers=headers, - expects=(200, 201, ), - throws=exceptions.UploadError, - ) - """ - # md5 is returned as ETag header as long as server side encryption is not used. if stream.writers['md5'].hexdigest != resp['ETag'].replace('"', ''): raise exceptions.UploadChecksumMismatchError() From f83e2cb8c363f1be0c4c516105e8aa8630711db2 Mon Sep 17 00:00:00 2001 From: Josh Bird Date: Thu, 19 Jul 2018 13:33:48 -0400 Subject: [PATCH 5/6] Adds logging and handle single request at a time --- waterbutler/auth/osf/handler.py | 1 + waterbutler/providers/s3/metadata.py | 2 +- waterbutler/providers/s3/provider.py | 113 ++++++++---------- .../server/api/v1/provider/__init__.py | 20 ++++ 4 files changed, 73 insertions(+), 63 deletions(-) diff --git a/waterbutler/auth/osf/handler.py b/waterbutler/auth/osf/handler.py index 6f976976f..174e230cb 100644 --- a/waterbutler/auth/osf/handler.py +++ b/waterbutler/auth/osf/handler.py @@ -40,6 +40,7 @@ def build_payload(self, bundle, view_only=None, cookie=None): return query_params async def make_request(self, params, headers, cookies): + data = None try: response = await aiohttp.request( 'get', diff --git a/waterbutler/providers/s3/metadata.py b/waterbutler/providers/s3/metadata.py index a019833a4..fb52a1dab 100644 --- a/waterbutler/providers/s3/metadata.py +++ b/waterbutler/providers/s3/metadata.py @@ -138,7 +138,7 @@ def version(self): @property def modified(self): - return self.raw['LastModified'] + return utils.normalize_datetime(self.raw['LastModified']) @property def extra(self): diff --git a/waterbutler/providers/s3/provider.py b/waterbutler/providers/s3/provider.py index ba6ccec28..2a501dc37 100644 --- a/waterbutler/providers/s3/provider.py +++ b/waterbutler/providers/s3/provider.py @@ -36,6 +36,7 @@ logger = logging.getLogger(__name__) + class S3Provider(provider.BaseProvider): """Provider for Amazon's S3 cloud storage service. @@ -64,8 +65,10 @@ def __init__(self, auth, credentials, settings): :param dict credentials: Dict containing `access_key` and `secret_key` :param dict settings: Dict containing `bucket` """ + logger.info("__init__") super().__init__(auth, credentials, settings) + logger.info("About to create resource") self.s3 = boto3.resource( 's3', endpoint_url='http{}://{}:{}'.format( @@ -75,27 +78,23 @@ def __init__(self, auth, credentials, settings): ), aws_access_key_id=credentials['access_key'], aws_secret_access_key=credentials['secret_key'], - # config=Config(signature_version='s3v4'), - # region_name='us-east-1' ) + logger.info("Resource created") + + logger.info("About to create bucket") + self.bucket = self.s3.Bucket(settings['bucket']) + logger.info("About to load bucket") + self.bucket.load() + logger.info("Bucket loaded") - self.connection = S3Connection( - credentials['access_key'], - credentials['secret_key'], - calling_format=OrdinaryCallingFormat(), - host=credentials['host'], - port=credentials['port'], - is_secure=False - ) self.bucket_name = settings['bucket'] - self.bucket = self.connection.get_bucket(settings['bucket'], validate=False) self.encrypt_uploads = self.settings.get('encrypt_uploads', False) self.encrypt_uploads = False self.region = None async def validate_v1_path(self, path, **kwargs): - logger.info("Validate") + logger.info("validate_v1_path") wb_path = WaterButlerPath(path) if path == '/': @@ -111,21 +110,26 @@ async def validate_v1_path(self, path, **kwargs): return wb_path async def validate_path(self, path, **kwargs): + logger.info("validate_path") return WaterButlerPath(path) def can_duplicate_names(self): + logger.info("can_duplicate_names") return True def can_intra_copy(self, dest_provider, path=None): + logger.info("can_intra_copy") return type(self) == type(dest_provider) and not getattr(path, 'is_dir', False) def can_intra_move(self, dest_provider, path=None): + logger.info("can_intra_move") return type(self) == type(dest_provider) and not getattr(path, 'is_dir', False) async def intra_copy(self, dest_provider, source_path, dest_path): """Copy key from one S3 bucket to another. The credentials specified in `dest_provider` must have read access to `source.bucket`. """ + logger.info("intra_copy") exists = await dest_provider.exists(dest_path) dest_key = dest_provider.bucket.new_key(dest_path.path) @@ -158,8 +162,7 @@ async def download(self, path, accept_url=False, revision=None, range=None, **kw :rtype: :class:`waterbutler.core.streams.ResponseStreamReader` :raises: :class:`waterbutler.core.exceptions.DownloadError` """ - logger.info("Download") - + logger.info("download") get_kwargs = {} if not path.is_file: @@ -178,7 +181,7 @@ async def download(self, path, accept_url=False, revision=None, range=None, **kw try: res = self.s3.Object( - self.bucket_name, + self.bucket.name, path.path ).get(**get_kwargs) except: @@ -194,8 +197,7 @@ async def upload(self, stream, path, conflict='replace', **kwargs): :rtype: dict, bool """ - logger.info("Upload") - + logger.info("upload") path, exists = await self.handle_name_conflict(path, conflict=conflict) stream.add_writer('md5', streams.HashStreamWriter(hashlib.md5)) @@ -210,7 +212,7 @@ async def upload(self, stream, path, conflict='replace', **kwargs): self.bucket_name, path.path ).put( - Body=(await stream.read()) # NBeeds to calculate hash inside boto - some issue with not implementing buffer api. + Body=(await stream.read()) # Needs to calculate hash inside boto so can't do a request manually? - some issue with not implementing buffer api. ) # md5 is returned as ETag header as long as server side encryption is not used. @@ -225,6 +227,7 @@ async def delete(self, path, confirm_delete=0, **kwargs): :param str path: The path of the key to delete :param int confirm_delete: Must be 1 to confirm root folder delete """ + logger.info("delete") if path.is_root: if not confirm_delete == 1: raise exceptions.DeleteError( @@ -243,11 +246,11 @@ async def _delete_file(self, path, **kwargs): Called from: func: delete if path.is_file """ - bucket = self.s3.Bucket(self.bucket_name) - sign_url = lambda: bucket.meta.client.generate_presigned_url( + logger.info("_delete_file") + sign_url = lambda: self.bucket.meta.client.generate_presigned_url( 'delete_object', Params={ - 'Bucket': self.bucket_name, + 'Bucket': self.bucket.name, 'Key': path.path }, ExpiresIn=settings.TEMP_URL_SECS, @@ -277,12 +280,13 @@ async def _delete_folder(self, path, **kwargs): To fully delete an occupied folder, we must delete all of the comprising objects. Amazon provides a bulk delete operation to simplify this. """ + logger.info("_delete_folder") for page in self.s3.meta.client.get_paginator('list_objects').paginate( - Bucket=self.bucket_name, + Bucket=self.bucket.name, Prefix=path.path ): - self.s3.meta.client.delete_objects( - Bucket=self.bucket_name, + self.s3.meta.client.delete_objects( # Signing a delete_objects url with boto3 requires witchcraft + Bucket=self.bucket.name, Delete={ 'Objects': [{'Key': item['Key']} for item in page['Contents']] } @@ -294,29 +298,23 @@ async def revisions(self, path, **kwargs): :param str path: The path to a key :rtype list: """ - query_params = {'prefix': path.path, 'delimiter': '/', 'versions': ''} - url = functools.partial(self.bucket.generate_url, settings.TEMP_URL_SECS, 'GET', query_parameters=query_params) logger.info("revisions") try: - resp = await self.make_request( - 'GET', - url, - params=query_params, - expects=(200, ), - throws=exceptions.MetadataError, + resp = self.bucket.meta.client.list_object_versions( + Bucket=self.bucket.name, + Delimiter='/', + Prefix=path.path ) - content = await resp.read() - versions = xmltodict.parse(content)['ListVersionsResult'].get('Version') or [] - - if isinstance(versions, dict): - versions = [versions] + versions = resp['Versions'] return [ S3Revision(item) for item in versions if item['Key'] == path.path ] - except: + + except Exception as err: + logger.info(err) return [] async def metadata(self, path, revision=None, **kwargs): @@ -328,7 +326,7 @@ async def metadata(self, path, revision=None, **kwargs): logger.info("metadata") if path.is_dir: - return (await self._metadata_folder(path.path)) + return (await self._metadata_folder(path.path)) # # store a hash of these args and the result in redis? return (await self._metadata_file(path.path, revision=revision)) @@ -337,29 +335,26 @@ async def create_folder(self, path, folder_precheck=True, **kwargs): """ :param str path: The path to create a folder at """ - logger.info("Create_folder") + logger.info("create_folder") WaterButlerPath.validate_folder(path) if folder_precheck: - # We should have already validated the path at this point - we + # We should have already validated the path at this point? - we # should store the value so when we're here we dont make an extra # request. - # If the folder exists, why do we need to throw? the user has asked - # us to make sure a folder with a certain name exists in the - # provider, if it already exsists, can we return Not Modified? if (await self.exists(path)): raise exceptions.FolderNamingConflict(path.name) - self.s3.Object( - self.bucket_name, - path.path - ).put() - # throws=exceptions.CreateFolderError + self.bucket.meta.client.put_object( + Bucket=self.bucket.name, + Key=path.path + ) return S3FolderMetadata({'Prefix': path.path}) async def _metadata_file(self, path, revision=None): - logger.info("metadata_file") - + """Load metadata for a single object in the bucket. + """ + logger.info("_metadata_file") if ( revision == 'Latest' or revision == '' or @@ -376,7 +371,9 @@ async def _metadata_file(self, path, revision=None): revision ) try: + logger.info("About to load") obj.load() + logger.info("After load") except ClientError as err: if err.response['Error']['Code'] == '404': raise exceptions.NotFoundError(path) @@ -390,25 +387,17 @@ async def _metadata_folder(self, path): contents at the root of the bucket, or a folder has been selected as a prefix by the user """ - logger.info("metadata_folder") - bucket = self.s3.Bucket(self.bucket_name) - logger.info("LIST_OBJECTS") - result = bucket.meta.client.list_objects(Bucket=bucket.name, Prefix=path, Delimiter='/') + logger.info("_metadata_folder") + result = self.bucket.meta.client.list_objects(Bucket=self.bucket.name, Prefix=path, Delimiter='/') prefixes = result.get('CommonPrefixes', []) contents = result.get('Contents', []) - logger.info(bucket) - logger.info(result) - logger.info(prefixes) - logger.info(contents) if not contents and not prefixes and not path == "": # If contents and prefixes are empty then this "folder" # must exist as a key with a / at the end of the name # if the path is root there is no need to test if it exists - obj = self.s3.Object(self.bucket_name, path) + obj = self.s3.Object(self.bucket.name, path) try: - logger.info("Trying to load object") - logger.info(path) obj.load() except ClientError as err: if err.response['Error']['Code'] == '404': diff --git a/waterbutler/server/api/v1/provider/__init__.py b/waterbutler/server/api/v1/provider/__init__.py index b31c9e6d0..8d4dd0b98 100644 --- a/waterbutler/server/api/v1/provider/__init__.py +++ b/waterbutler/server/api/v1/provider/__init__.py @@ -36,7 +36,18 @@ class ProviderHandler(core.BaseHandler, CreateMixin, MetadataMixin, MoveCopyMixi POST_VALIDATORS = {'put': 'postvalidate_put'} PATTERN = r'/resources/(?P(?:\w|\d)+)/providers/(?P(?:\w|\d)+)(?P/.*/?)' + mutex = False + async def prepare(self, *args, **kwargs): + + + while self.mutex: + logger.info("Deferring request...") + await asyncio.sleep(1) + ProviderHandler.mutex = True + + logger.info("Handling new request!!") + logger.info([h for h in self.request.headers.get_all()]) method = self.request.method.lower() # TODO Find a nicer way to handle this @@ -128,6 +139,14 @@ async def data_received(self, chunk): else: self.body += chunk + def finish(self): + logger.info("REQUEST FINISHED") + import pdb + pdb.set_trace() + ProviderHandler.mutex = False + super().finish() + + async def prepare_stream(self): """Sets up an asyncio pipe from client to server Only called on PUT when path is to a file @@ -174,6 +193,7 @@ def on_finish(self): self._send_hook(action) + def _send_hook(self, action): source = None destination = None From ada59b224cad3c30298595858cb3500ebbc1e97c Mon Sep 17 00:00:00 2001 From: Josh Bird Date: Mon, 30 Jul 2018 18:13:09 -0400 Subject: [PATCH 6/6] Finalize changes to url signers Finishes updates to the sign_url functions to allow use with boto3's `generate_presigned_url` --- requirements.txt | 2 +- waterbutler/providers/s3/metadata.py | 20 +- waterbutler/providers/s3/provider.py | 646 +++++++++++++----- .../server/api/v1/provider/__init__.py | 25 +- 4 files changed, 490 insertions(+), 203 deletions(-) diff --git a/requirements.txt b/requirements.txt index e844bc12b..976055383 100644 --- a/requirements.txt +++ b/requirements.txt @@ -16,7 +16,7 @@ raven==5.27.0 setuptools==37.0.0 stevedore==1.2.0 tornado==4.3 -xmltodict==0.9.0 +xmltodict==0.11.0 # Issue: certifi-2015.9.6.1 and 2015.9.6.2 fail verification (https://github.com/certifi/python-certifi/issues/26) certifi==2015.4.28 diff --git a/waterbutler/providers/s3/metadata.py b/waterbutler/providers/s3/metadata.py index fb52a1dab..9e9acde45 100644 --- a/waterbutler/providers/s3/metadata.py +++ b/waterbutler/providers/s3/metadata.py @@ -17,7 +17,7 @@ def name(self): class S3FileMetadataHeaders(S3Metadata, metadata.BaseFileMetadata): - def __init__(self, path, headers): + def __init__(self, path, s3_object=None, headers=None): self._path = path self.obj = headers self._etag = None @@ -31,15 +31,15 @@ def path(self): @property def size(self): - return self.obj.content_length + return self.raw['CONTENT-LENGTH'] @property def content_type(self): - return self.obj.content_type + return self.raw['CONTENT-TYPE'] @property def modified(self): - return utils.normalize_datetime(self.obj.last_modified) + return self.raw['LAST-MODIFIED'] @property def created_utc(self): @@ -47,17 +47,15 @@ def created_utc(self): @property def etag(self): - if self._etag: - return self._etag - else: - self._etag = self.obj.e_tag.replace('"', '') - return self._etag + if self._etag is None: + self._etag = self.raw['ETAG'].replace('"', '') + return self._etag @property def extra(self): return { 'md5': self.etag, - 'encryption': self.obj.server_side_encryption, + 'encryption': self.raw.get('X-AMZ-SERVER-SIDE-ENCRYPTION', ''), 'hashes': { 'md5': self.etag, }, @@ -76,7 +74,7 @@ def size(self): @property def modified(self): - return self.raw['LastModified'].isoformat() + return self.raw['LastModified'] @property def created_utc(self): diff --git a/waterbutler/providers/s3/provider.py b/waterbutler/providers/s3/provider.py index 2a501dc37..42157a086 100644 --- a/waterbutler/providers/s3/provider.py +++ b/waterbutler/providers/s3/provider.py @@ -1,3 +1,4 @@ +import asyncio import os import itertools import hashlib @@ -9,6 +10,7 @@ import xml.sax.saxutils +#import aioboto3 from boto import config as boto_config from boto.compat import BytesIO # type: ignore from boto.utils import compute_md5 @@ -16,9 +18,16 @@ from boto.s3.connection import S3Connection from boto.s3.connection import OrdinaryCallingFormat + import boto3 +from botocore.awsrequest import prepare_request_dict from botocore.client import Config -from botocore.exceptions import ClientError +from botocore.exceptions import ( + ClientError, + UnknownClientMethodError +) +from botocore.signers import _should_use_global_endpoint + from waterbutler.core import streams from waterbutler.core import provider @@ -36,6 +45,73 @@ logger = logging.getLogger(__name__) +def generate_presigned_url(self, ClientMethod, Params=None, Headers=None, ExpiresIn=3600, + HttpMethod=None): + """Generate a presigned url given a client, its method, and arguments + :type ClientMethod: string + :param ClientMethod: The client method to presign for + :type Params: dict + :param Params: The parameters normally passed to + ``ClientMethod``. + :type ExpiresIn: int + :param ExpiresIn: The number of seconds the presigned url is valid + for. By default it expires in an hour (3600 seconds) + :type HttpMethod: string + :param HttpMethod: The http method to use on the generated url. By + default, the http method is whatever is used in the method's model. + :returns: The presigned url + """ + client_method = ClientMethod + params = Params + if params is None: + params = {} + # + headers = Headers + # + expires_in = ExpiresIn + http_method = HttpMethod + context = { + 'is_presign_request': True, + 'use_global_endpoint': _should_use_global_endpoint(self), + } + + request_signer = self._request_signer + serializer = self._serializer + + try: + operation_name = self._PY_TO_OP_NAME[client_method] + except KeyError: + raise UnknownClientMethodError(method_name=client_method) + + operation_model = self.meta.service_model.operation_model( + operation_name) + + params = self._emit_api_params(params, operation_model, context) + + # Create a request dict based on the params to serialize. + request_dict = serializer.serialize_to_request( + params, operation_model) + + logger.info(headers) + logger.info(request_dict) + + # Switch out the http method if user specified it. + if http_method is not None: + request_dict['method'] = http_method + + # + if headers is not None: + request_dict['headers'].update(headers) + # + # Prepare the request dict by including the client's endpoint url. + prepare_request_dict( + request_dict, endpoint_url=self.meta.endpoint_url, context=context) + + # Generate the presigned url. + return request_signer.generate_presigned_url( + request_dict=request_dict, expires_in=expires_in, + operation_name=operation_name) + class S3Provider(provider.BaseProvider): """Provider for Amazon's S3 cloud storage service. @@ -53,6 +129,9 @@ class S3Provider(provider.BaseProvider): * A GET prefix query against a non-existent path returns 200 """ NAME = 's3' + _s3 = None + _client = None + _region = None def __init__(self, auth, credentials, settings): """Initialize S3Provider @@ -65,36 +144,59 @@ def __init__(self, auth, credentials, settings): :param dict credentials: Dict containing `access_key` and `secret_key` :param dict settings: Dict containing `bucket` """ - logger.info("__init__") super().__init__(auth, credentials, settings) - logger.info("About to create resource") - self.s3 = boto3.resource( - 's3', - endpoint_url='http{}://{}:{}'.format( - 's' if credentials['encrypted'] else '', - credentials['host'], - credentials['port'] - ), - aws_access_key_id=credentials['access_key'], - aws_secret_access_key=credentials['secret_key'], - ) - logger.info("Resource created") - - logger.info("About to create bucket") - self.bucket = self.s3.Bucket(settings['bucket']) - logger.info("About to load bucket") - self.bucket.load() - logger.info("Bucket loaded") - + self.credentials = credentials self.bucket_name = settings['bucket'] self.encrypt_uploads = self.settings.get('encrypt_uploads', False) - self.encrypt_uploads = False - self.region = None - async def validate_v1_path(self, path, **kwargs): + # TODO Move client creaation to `__aenter__` + @property + async def client(self): + if self._client is None: + # In order to make a client that we can use on any region, we need + # to supply the client with a string of the region name. First we + # make a temporary client in order to get that string. We put the + # client creation inside a lmabda so its easier to call twice. + # This must be a lambda an *not* a partial, because we want the + # expression reevaluated each time. + _make_client = lambda: boto3.client( + 's3', + region_name=self._region, + aws_access_key_id=self.credentials['access_key'], + aws_secret_access_key=self.credentials['secret_key'], + endpoint_url='http{}://{}:{}'.format( + 's' if self.credentials['encrypted'] else '', + self.credentials['host'], + self.credentials['port'] + ) if self.credentials['host'] != 's3.amazonaws.com' else None + ) + self._region = _make_client().get_bucket_location( + Bucket=self.bucket_name + ).get('LocationConstraint', None) + # Remake client after getting bucket location + self._client = _make_client() + # Put the patched version of the url signer on the client. + self._client.__class__.generate_presigned_url = generate_presigned_url + return self._client + + @property + def s3(self): + if self._s3 is None: + self._s3 = boto3.resource('s3') + return self._s3 + + @property + async def region(self): + # Awaiting self.client ensures the region is set properly; if we have a + # client set on our provider, we know the region is correct because we + # need the region in order to make the client. + await self.client + return self._region - logger.info("validate_v1_path") + async def validate_v1_path(self, path, **kwargs): + """Validates a waterbutler path + """ wb_path = WaterButlerPath(path) if path == '/': @@ -109,48 +211,52 @@ async def validate_v1_path(self, path, **kwargs): return wb_path + # Do we call this anywhere, and why can't we just use the constructor? async def validate_path(self, path, **kwargs): - logger.info("validate_path") return WaterButlerPath(path) def can_duplicate_names(self): - logger.info("can_duplicate_names") + # TODO This should be a class attribute return True def can_intra_copy(self, dest_provider, path=None): - logger.info("can_intra_copy") return type(self) == type(dest_provider) and not getattr(path, 'is_dir', False) def can_intra_move(self, dest_provider, path=None): - logger.info("can_intra_move") return type(self) == type(dest_provider) and not getattr(path, 'is_dir', False) async def intra_copy(self, dest_provider, source_path, dest_path): """Copy key from one S3 bucket to another. The credentials specified in `dest_provider` must have read access to `source.bucket`. """ - logger.info("intra_copy") exists = await dest_provider.exists(dest_path) - dest_key = dest_provider.bucket.new_key(dest_path.path) + # TODO move this to `__aenter__` + client = await self.client # ensure no left slash when joining paths source_path = '/' + os.path.join(self.settings['bucket'], source_path.path) headers = {'x-amz-copy-source': parse.quote(source_path)} - url = functools.partial( - dest_key.generate_url, - settings.TEMP_URL_SECS, - 'PUT', - headers=headers, + + sign_url = lambda: client.generate_presigned_url( + 'copy_object', + Params={ + 'Bucket': self.bucket_name, + 'CopySource': source_path, + 'Key': dest_path.path + }, + ExpiresIn=settings.TEMP_URL_SECS, ) - resp = await self.make_request( - 'PUT', url, + response = await self.make_request( + 'PUT', + sign_url, + expects={200}, skip_auto_headers={'CONTENT-TYPE'}, headers=headers, - expects=(200, ), - throws=exceptions.IntraCopyError, + throws=exceptions.IntraCopyError ) - await resp.release() + await response.release() + return (await dest_provider.metadata(dest_path)), not exists async def download(self, path, accept_url=False, revision=None, range=None, **kwargs): @@ -162,7 +268,6 @@ async def download(self, path, accept_url=False, revision=None, range=None, **kw :rtype: :class:`waterbutler.core.streams.ResponseStreamReader` :raises: :class:`waterbutler.core.exceptions.DownloadError` """ - logger.info("download") get_kwargs = {} if not path.is_file: @@ -171,23 +276,35 @@ async def download(self, path, accept_url=False, revision=None, range=None, **kw if range: get_kwargs['Range'] = 'bytes={}-{}'.format('', '') - if kwargs.get('displayName'): - get_kwargs['ResponseContentDisposition'] = 'attachment; filename*=UTF-8\'\'{}'.format(parse.quote(kwargs['displayName'])) - else: - get_kwargs['ResponseContentDisposition'] = 'attachment' + # if kwargs.get('displayName'): + # get_kwargs['ResponseContentDisposition'] = 'attachment; filename*=UTF-8\'\'{}'.format(parse.quote(kwargs['displayName'])) + # else: + # get_kwargs['ResponseContentDisposition'] = 'attachment' if revision: get_kwargs['VersionId'] = revision - try: - res = self.s3.Object( - self.bucket.name, - path.path - ).get(**get_kwargs) - except: - raise exceptions.DownloadError() + # TODO move this to `__aenter__` + client = await self.client + + sign_url = lambda: client.generate_presigned_url( + 'get_object', + Params={ + 'Bucket': self.bucket_name, + 'Key': path.path + }, + ExpiresIn=settings.TEMP_URL_SECS, + HttpMethod='GET' + ) - return S3ResponseBodyStream(res) + response = await self.make_request( + 'GET', + sign_url, + range=range, + expects={200, 206}, + throws=exceptions.DownloadError + ) + return streams.ResponseStreamReader(response) async def upload(self, stream, path, conflict='replace', **kwargs): """Uploads the given stream to S3 @@ -197,10 +314,11 @@ async def upload(self, stream, path, conflict='replace', **kwargs): :rtype: dict, bool """ - logger.info("upload") path, exists = await self.handle_name_conflict(path, conflict=conflict) stream.add_writer('md5', streams.HashStreamWriter(hashlib.md5)) + # TODO move this to `__aenter__` + client = await self.client headers = {'Content-Length': str(stream.size)} # this is usually set in boto.s3.key.generate_url, but do it here @@ -208,15 +326,30 @@ async def upload(self, stream, path, conflict='replace', **kwargs): if self.encrypt_uploads: headers['x-amz-server-side-encryption'] = 'AES256' - resp = self.s3.Object( - self.bucket_name, - path.path - ).put( - Body=(await stream.read()) # Needs to calculate hash inside boto so can't do a request manually? - some issue with not implementing buffer api. + sign_url = lambda: client.generate_presigned_url( + 'put_object', + Params={ + 'Bucket': self.bucket_name, + 'Key': path.path, + 'ContentLength': stream.size, + **({'ServerSideEncryption': 'AES256'} if self.encrypt_uploads else {}) + }, + ExpiresIn=settings.TEMP_URL_SECS, ) + response = await self.make_request( + 'PUT', + sign_url, + data=stream, + skip_auto_headers={'CONTENT-TYPE'}, + headers=headers, + expects={200, 206}, + throws=exceptions.DownloadError + ) + await response.release() + # md5 is returned as ETag header as long as server side encryption is not used. - if stream.writers['md5'].hexdigest != resp['ETag'].replace('"', ''): + if stream.writers['md5'].hexdigest != response.headers['ETag'].replace('"', ''): raise exceptions.UploadChecksumMismatchError() return (await self.metadata(path, **kwargs)), not exists @@ -227,7 +360,6 @@ async def delete(self, path, confirm_delete=0, **kwargs): :param str path: The path of the key to delete :param int confirm_delete: Must be 1 to confirm root folder delete """ - logger.info("delete") if path.is_root: if not confirm_delete == 1: raise exceptions.DeleteError( @@ -244,13 +376,13 @@ async def _delete_file(self, path, **kwargs): """Deletes a single object located at a certain key. Called from: func: delete if path.is_file - """ - logger.info("_delete_file") - sign_url = lambda: self.bucket.meta.client.generate_presigned_url( + # TODO move this to `__aenter__` + client = await self.client + sign_url = lambda: client.generate_presigned_url( 'delete_object', Params={ - 'Bucket': self.bucket.name, + 'Bucket': self.bucket_name, 'Key': path.path }, ExpiresIn=settings.TEMP_URL_SECS, @@ -280,17 +412,97 @@ async def _delete_folder(self, path, **kwargs): To fully delete an occupied folder, we must delete all of the comprising objects. Amazon provides a bulk delete operation to simplify this. """ - logger.info("_delete_folder") - for page in self.s3.meta.client.get_paginator('list_objects').paginate( - Bucket=self.bucket.name, - Prefix=path.path - ): - self.s3.meta.client.delete_objects( # Signing a delete_objects url with boto3 requires witchcraft - Bucket=self.bucket.name, - Delete={ - 'Objects': [{'Key': item['Key']} for item in page['Contents']] + # TODO move this to `__aenter__` + client = await self.client + + # Needs to be a lambda; *not* partial, so offset is reevaluated + # each time it's called. This is done so that we don't need to + # create a new callable object each request we make. + + # The wierdness with using ** on the Params is because boto3 is + # rather draconian about arguments; passing None for this param results + # in an error that the None object is not of type string. + + # `marker` needs to be defined before the url signer so that it exists + # when the url signer is defined. It is used for pagination, to + # determine which page is returned by the request. + marker = None + sign_list_url = lambda: client.generate_presigned_url( + 'list_objects_v2', + Params={ + 'Bucket': self.bucket_name, + 'Prefix': path.path, + 'Delimiter': '/', + **({'Marker': marker} if marker is not None else {}) + }, + ExpiresIn=settings.TEMP_URL_SECS + ) + + objects_to_delete = [] + + delete_payload = '' + sign_delete_url = lambda: client.generate_presigned_url( + 'delete_objects', + Params={ + 'Bucket': self.bucket_name, + 'Delete': { + 'Objects': [{'Key': object['Key']} for object in objects_to_delete] + } + }, + Headers={ + 'Content-Length': str(len(delete_payload)), + 'Content-MD5': compute_md5(BytesIO(delete_payload))[1], + 'Content-Type': 'text/xml' + } + ) + + # S3 'truncates' responses that would list over 1000 objects. The + # response will contain a key, 'IsTruncated', if there were more than + # 1000 objects. Before the first request, we assume the list is + # truncated, so that at least one request will be made. + truncated = True + while truncated: + list_response = await self.make_request( + 'GET', + sign_list_url, + expects={200, 204}, + throws=exceptions.MetadataError, + ) + page = xmltodict.parse( + await list_response.read(), + strip_whitespace=False, + force_list={'Contents'} + ) + marker = page['ListBucketResult'].get('NextMarker', None) + truncated = page['ListBucketResult'].get('IsTruncated', 'false') != 'false' + + objects_to_delete = page['ListBucketResult'].get('Contents', []) + + delete_payload = '{}'.format( + ''.join([ + '{}'.format(object['Key']) + for object in objects_to_delete + ]) + ).encode('utf-8') + + # TODO Don't wait for the delete to finish before requesting the + # next batch, or sending that delete request. + delete_response = await self.make_request( + 'POST', + sign_delete_url, + data=delete_payload, + headers={ + 'Content-Length': str(len(delete_payload)), + 'Content-MD5': compute_md5(BytesIO(delete_payload))[1], + 'Content-Type': 'text/xml' } ) + await delete_response.release() + del delete_response + del list_response + + # TODO Put the delete requests in a list of tasks and wait for all of them to + # finish here, before returning async def revisions(self, path, **kwargs): """Get past versions of the requested key @@ -298,127 +510,217 @@ async def revisions(self, path, **kwargs): :param str path: The path to a key :rtype list: """ - logger.info("revisions") - try: - resp = self.bucket.meta.client.list_object_versions( - Bucket=self.bucket.name, - Delimiter='/', - Prefix=path.path - ) - versions = resp['Versions'] - - return [ - S3Revision(item) - for item in versions - if item['Key'] == path.path - ] - - except Exception as err: - logger.info(err) - return [] - - async def metadata(self, path, revision=None, **kwargs): - """Get Metadata about the requested file or folder + # TODO move this to `__aenter__` + client = await self.client + sign_url = lambda: client.generate_signed_url( + 'list_object_versions', + Params={ + 'Bucket': self.bucket_name, + 'Delimiter': '/', + 'Prefix': path.path + } + ) - :param WaterButlerPath path: The path to a key or folder - :rtype: dict or list - """ - logger.info("metadata") + response = await self.make_request( + 'POST', + sign_url, + expects={200}, + throws=exceptions.MetadataError + ) - if path.is_dir: - return (await self._metadata_folder(path.path)) - # # store a hash of these args and the result in redis? + versions = xmltodict.parse( + await response.release(), + force_list={'Version'} + )['ListVersionsResult'].get('Version', []) - return (await self._metadata_file(path.path, revision=revision)) + return [ + S3Revision(item) + for item in versions + if item['Key'] == path.path + ] async def create_folder(self, path, folder_precheck=True, **kwargs): - """ + """Create an empty object on the bucket that contains a trailing slash :param str path: The path to create a folder at """ - logger.info("create_folder") + # TODO move this to `__aenter__` + client = await self.client WaterButlerPath.validate_folder(path) if folder_precheck: - # We should have already validated the path at this point? - we - # should store the value so when we're here we dont make an extra - # request. if (await self.exists(path)): raise exceptions.FolderNamingConflict(path.name) - self.bucket.meta.client.put_object( - Bucket=self.bucket.name, - Key=path.path + sign_url = lambda: client.generate_presigned_url( + 'put_object', + Params={ + 'Bucket': self.bucket_name, + 'Key': path, + }, + ExpiresIn=settings.TEMP_URL_SECS, ) - return S3FolderMetadata({'Prefix': path.path}) + async with self.request( + 'PUT', + sign_url, + skip_auto_headers={'CONTENT-TYPE'}, + expects={200, 201}, + throws=exceptions.CreateFolderError, + ): + return S3FolderMetadata({'Prefix': path.path}) + + async def metadata(self, path, revision=None, **kwargs): + """Get Metadata about the requested file or folder + + :param WaterButlerPath path: The path to a key or folder + :rtype: dict or list + """ + if path.is_dir: + return (await self._metadata_folder(path.path)) + # store a hash of these args and the result in redis? + + return (await self._metadata_file(path.path, revision=revision)) async def _metadata_file(self, path, revision=None): """Load metadata for a single object in the bucket. """ - logger.info("_metadata_file") - if ( - revision == 'Latest' or - revision == '' or + # TODO move this to `__aenter__` + client = await self.client + + # Homogenise any weird version ids + if any({ + revision == 'Latest', + revision == '', not revision - ): - obj = self.s3.Object( - self.bucket.name, - path - ) - else: - obj = self.s3.ObjectVersion( - self.bucket.name, - path, - revision - ) - try: - logger.info("About to load") - obj.load() - logger.info("After load") - except ClientError as err: - if err.response['Error']['Code'] == '404': - raise exceptions.NotFoundError(path) - else: - raise err - - return S3FileMetadataHeaders(path, obj) + }): + revision = None + + sign_url = lambda: client.generate_presigned_url( + 'head_object', + Params={ + 'Bucket': self.bucket_name, + 'Key': path, + **({'VersionId': revision} if revision is not None else {}) + }, + ExpiresIn=settings.TEMP_URL_SECS, + ) + response = await self.make_request( + 'HEAD', + sign_url, + expects={200, 204}, + throws=exceptions.MetadataError, + ) + await response.release() + + return S3FileMetadataHeaders( + path, + headers=response.headers # TODO Fix S3MetadataFileHeaders + ) async def _metadata_folder(self, path): """Get metadata about the contents of a bucket. This is either the contents at the root of the bucket, or a folder has been selected as a prefix by the user """ - logger.info("_metadata_folder") - result = self.bucket.meta.client.list_objects(Bucket=self.bucket.name, Prefix=path, Delimiter='/') - prefixes = result.get('CommonPrefixes', []) - contents = result.get('Contents', []) - if not contents and not prefixes and not path == "": - # If contents and prefixes are empty then this "folder" - # must exist as a key with a / at the end of the name - # if the path is root there is no need to test if it exists - - obj = self.s3.Object(self.bucket.name, path) - try: - obj.load() - except ClientError as err: - if err.response['Error']['Code'] == '404': - raise exceptions.NotFoundError(path) - else: - raise err - if isinstance(contents, dict): - contents = [contents] - - if isinstance(prefixes, dict): - prefixes = [prefixes] - - items = [S3FolderMetadata(item) for item in prefixes] - - for content in contents: - if content['Key'] == path: - continue - - if content['Key'].endswith('/'): - items.append(S3FolderKeyMetadata(content)) - else: - items.append(S3FileMetadata(content)) + # TODO move this to `__aenter__` + client = await self.client + + # Needs to be a lambda; *not* partial, so offset is reevaluated + # each time it's called. This is done so that we don't need to + # create a new callable object each request we make. + + # The wierdness with using ** on the Params is because boto3 is + # rather draconian about arguments; passing None for this param results + # in an error that the None object is not of type string. + + # `marker` needs to be defined before the url signer so that it exists + # when the url signer is defined. It is used for pagination, to + # determine which page is returned by the request. + marker = None + sign_url = lambda: client.generate_presigned_url( + 'list_objects_v2', + Params={ + 'Bucket': self.bucket_name, + 'Prefix': path, + 'Delimiter': '/', + **({'Marker': marker} if marker is not None else {}) + }, + ExpiresIn=settings.TEMP_URL_SECS + ) + + # S3 'truncates' responses that would list over 1000 objects. The + # response will contain a key, 'IsTruncated', if there were more than + # 1000 objects. Before the first request, we assume the list is + # truncated, so that at least one request will be made. + truncated = True + + # Each request will return 0 or more 'contents' and 'common prefixes'. + # Contents contains keys that begin with 'prefix' and contain no + # delimiter characters after the characters that match the prefix. + # Common prefixes match any keys that do contain a delimiter after the + # characters that match the prefix. Each request extends the `contents` + # and `prefixes` arrays with the respective contents and prefixes that + # were returned in the request. + contents = [] + prefixes = [] + + while truncated: + response = await self.make_request( + 'GET', + sign_url, + expects={200, 204}, + throws=exceptions.MetadataError, + ) + page = xmltodict.parse( + await response.read(), + strip_whitespace=False, + force_list={'CommonPrefixes', 'Contents'} + ) + prefixes.extend(page['ListBucketResult'].get('CommonPrefixes', [])) + contents.extend(page['ListBucketResult'].get('Contents', [])) + + marker = page['ListBucketResult'].get('NextMarker', None) + truncated = page['ListBucketResult'].get('IsTruncated', 'false') != 'false' + del response + + del sign_url + + items = [] + # If there are keys that have the provided prefix... + if contents or prefixes: + # Prefixes represent 'folders' + items.extend([S3FolderMetadata(prefix) for prefix in prefixes]) + + for content in contents: + # Only care about items that are not the same as where the + # addon is mounted. + if content['Key'] != path: + items.append( + S3FolderKeyMetadata(content) + if content['Key'].endswith('/') + else S3FileMetadata(content) + ) + + # If contents and prefixes are empty, but this is not the root + # path, then this "folder" must exist as a key with a / at the + # end of the name. + elif not path == "": + sign_url = lambda: client.generate_presigned_url( + 'head_object', + Params={ + 'Bucket': self.bucket_name, + 'Key': path, + }, + ExpiresIn=settings.TEMP_URL_SECS + ) + response = await self.make_request( + 'HEAD', + sign_url, + expects={200, 204}, + throws=exceptions.MetadataError, + ) + del sign_url + del response return items + + diff --git a/waterbutler/server/api/v1/provider/__init__.py b/waterbutler/server/api/v1/provider/__init__.py index 8d4dd0b98..c85cbac6b 100644 --- a/waterbutler/server/api/v1/provider/__init__.py +++ b/waterbutler/server/api/v1/provider/__init__.py @@ -32,22 +32,17 @@ def list_or_value(value): @tornado.web.stream_request_body class ProviderHandler(core.BaseHandler, CreateMixin, MetadataMixin, MoveCopyMixin): + """ProviderHandler + Handler for provider operations. Inherits from provider handler mixins + Create, Metadata, and MoveCopy + """ PRE_VALIDATORS = {'put': 'prevalidate_put', 'post': 'prevalidate_post'} POST_VALIDATORS = {'put': 'postvalidate_put'} PATTERN = r'/resources/(?P(?:\w|\d)+)/providers/(?P(?:\w|\d)+)(?P/.*/?)' - mutex = False - async def prepare(self, *args, **kwargs): - - - while self.mutex: - logger.info("Deferring request...") - await asyncio.sleep(1) - ProviderHandler.mutex = True - - logger.info("Handling new request!!") - logger.info([h for h in self.request.headers.get_all()]) + """Prepare to handle request + """ method = self.request.method.lower() # TODO Find a nicer way to handle this @@ -139,14 +134,6 @@ async def data_received(self, chunk): else: self.body += chunk - def finish(self): - logger.info("REQUEST FINISHED") - import pdb - pdb.set_trace() - ProviderHandler.mutex = False - super().finish() - - async def prepare_stream(self): """Sets up an asyncio pipe from client to server Only called on PUT when path is to a file