From 23967cd7f8a3cf5e5d80d052dc1eb67d135ce243 Mon Sep 17 00:00:00 2001 From: Wai Phyo Date: Tue, 21 Sep 2021 13:25:32 -0700 Subject: [PATCH 1/3] feat: add obj store class for S3 + Ceph --- .../granule_ingester/writers/S3ObjectStore.py | 57 +++++++++++++++++++ 1 file changed, 57 insertions(+) create mode 100644 granule_ingester/granule_ingester/writers/S3ObjectStore.py diff --git a/granule_ingester/granule_ingester/writers/S3ObjectStore.py b/granule_ingester/granule_ingester/writers/S3ObjectStore.py new file mode 100644 index 0000000..21c1542 --- /dev/null +++ b/granule_ingester/granule_ingester/writers/S3ObjectStore.py @@ -0,0 +1,57 @@ +import logging +from io import BytesIO +from uuid import UUID + +import boto3 +from granule_ingester.writers.DataStore import DataStore +from nexusproto import DataTile_pb2 as nexusproto +from nexusproto.DataTile_pb2 import TileData, NexusTile + +logger = logging.getLogger(__name__) + + +class S3ObjectStore(DataStore): + """ + Should be able to use for AWS-S3 and Ceph Object Store + """ + + def __init__(self, bucket, region, key=None, secret=None, session=None) -> None: + super().__init__() + self.__bucket = bucket + self.__boto3_session = { + 'region_name': region, + } + if key is not None: + self.__boto3_session['aws_access_key_id'] = key + if secret is not None: + self.__boto3_session['aws_secret_access_key'] = secret + if session is not None: + self.__boto3_session['aws_session_token'] = session + self.__s3_client = boto3.Session(**self.__boto3_session).client('s3') + + async def health_check(self) -> bool: + try: + response = self.__s3_client.list_objects_v2( + Bucket=self.__bucket, + # Delimiter='string', + # EncodingType='url', + MaxKeys=10, + Prefix='string', + ContinuationToken='string', + FetchOwner=False, + # StartAfter='string', + # RequestPayer='requester', + # ExpectedBucketOwner='string' + ) + # TODO inspect resopnse object + except Exception as e: + return False + return True + + def save_data(self, nexus_tile: NexusTile) -> None: + tile_id = str(UUID(str(nexus_tile.summary.tile_id))) + logger.debug(f'saving data {tile_id}') + serialized_tile_data = TileData.SerializeToString(nexus_tile.tile) + logger.debug(f'uploading to object store') + self.__s3_client.upload_fileobj(BytesIO(bytearray(serialized_tile_data)), self.__bucket, f'{tile_id}') + return From a61ded031f7ad1fb16e8327147e6fe88e246c932 Mon Sep 17 00:00:00 2001 From: Wai Phyo Date: Tue, 21 Sep 2021 13:29:28 -0700 Subject: [PATCH 2/3] chore: refactor main method to remove duplicate code --- granule_ingester/granule_ingester/main.py | 105 ++++++++-------------- 1 file changed, 39 insertions(+), 66 deletions(-) diff --git a/granule_ingester/granule_ingester/main.py b/granule_ingester/granule_ingester/main.py index fc542b0..9454323 100644 --- a/granule_ingester/granule_ingester/main.py +++ b/granule_ingester/granule_ingester/main.py @@ -162,78 +162,51 @@ async def main(loop): elastic_url = args.elastic_url elastic_username = args.elastic_username elastic_password = args.elastic_password - elastic_index = args.elastic_index - + elastic_index = args.elastic_index + + msg_consumer_params = { + 'rabbitmq_host': args.rabbitmq_host, + 'rabbitmq_username': args.rabbitmq_username, + 'rabbitmq_password': args.rabbitmq_password, + 'rabbitmq_queue': args.rabbitmq_queue, + } if metadata_store == 'solr': - consumer = MessageConsumer(rabbitmq_host=args.rabbitmq_host, - rabbitmq_username=args.rabbitmq_username, - rabbitmq_password=args.rabbitmq_password, - rabbitmq_queue=args.rabbitmq_queue, - data_store_factory=partial(cassandra_factory, - cassandra_contact_points, - cassandra_port, - cassandra_keyspace, - cassandra_username, - cassandra_password), - metadata_store_factory=partial(solr_factory, solr_host_and_port, zk_host_and_port)) - try: - solr_store = SolrStore(zk_url=zk_host_and_port) if zk_host_and_port else SolrStore(solr_url=solr_host_and_port) - await run_health_checks([CassandraStore(cassandra_contact_points, - cassandra_port, - cassandra_keyspace, - cassandra_username, - cassandra_password), - solr_store, - consumer]) - async with consumer: - logger.info("All external dependencies have passed the health checks. Now listening to message queue.") - await consumer.start_consuming(args.max_threads) - except FailedHealthCheckError as e: - logger.error(f"Quitting because not all dependencies passed the health checks: {e}") - except LostConnectionError as e: - logger.error(f"{e} Any messages that were being processed have been re-queued. Quitting.") - except Exception as e: - logger.exception(f"Shutting down because of an unrecoverable error:\n{e}") - finally: - sys.exit(1) - + metadata_store_obj = SolrStore(zk_url=zk_host_and_port) if zk_host_and_port else SolrStore(solr_url=solr_host_and_port) + msg_consumer_params['metadata_store_factory'] = partial(solr_factory, solr_host_and_port, zk_host_and_port) else: - consumer = MessageConsumer(rabbitmq_host=args.rabbitmq_host, - rabbitmq_username=args.rabbitmq_username, - rabbitmq_password=args.rabbitmq_password, - rabbitmq_queue=args.rabbitmq_queue, - data_store_factory=partial(cassandra_factory, + metadata_store_obj = ElasticsearchStore(elastic_url, elastic_username, elastic_password, elastic_index) + msg_consumer_params['metadata_store_factory'] = partial(elasticsearch_factory, + elastic_url, + elastic_username, + elastic_password, + elastic_index) + # TODO this will also need to check for cassandra vs S3 + msg_consumer_params['data_store_factory'] = partial(cassandra_factory, cassandra_contact_points, cassandra_port, cassandra_keyspace, cassandra_username, - cassandra_password), - metadata_store_factory=partial(elasticsearch_factory, - elastic_url, - elastic_username, - elastic_password, - elastic_index)) - try: - es_store = ElasticsearchStore(elastic_url, elastic_username, elastic_password, elastic_index) - await run_health_checks([CassandraStore(cassandra_contact_points, - cassandra_port, - cassandra_keyspace, - cassandra_username, - cassandra_password), - es_store, - consumer]) - - async with consumer: - logger.info("All external dependencies have passed the health checks. Now listening to message queue.") - await consumer.start_consuming(args.max_threads) - except FailedHealthCheckError as e: - logger.error(f"Quitting because not all dependencies passed the health checks: {e}") - except LostConnectionError as e: - logger.error(f"{e} Any messages that were being processed have been re-queued. Quitting.") - except Exception as e: - logger.exception(f"Shutting down because of an unrecoverable error:\n{e}") - finally: - sys.exit(1) + cassandra_password) + consumer = MessageConsumer(**msg_consumer_params) + try: + await run_health_checks([CassandraStore(cassandra_contact_points, + cassandra_port, + cassandra_keyspace, + cassandra_username, + cassandra_password), + metadata_store_obj, + consumer]) + async with consumer: + logger.info("All external dependencies have passed the health checks. Now listening to message queue.") + await consumer.start_consuming(args.max_threads) + except FailedHealthCheckError as e: + logger.error(f"Quitting because not all dependencies passed the health checks: {e}") + except LostConnectionError as e: + logger.error(f"{e} Any messages that were being processed have been re-queued. Quitting.") + except Exception as e: + logger.exception(f"Shutting down because of an unrecoverable error:\n{e}") + finally: + sys.exit(1) if __name__ == '__main__': From 4583dcc4519cad562410c48cf4d6bec5bdcc44b8 Mon Sep 17 00:00:00 2001 From: Wai Phyo Date: Tue, 21 Sep 2021 15:35:22 -0700 Subject: [PATCH 3/3] breaking:update entry script to check either s3 or cassandra --- granule_ingester/docker/entrypoint.sh | 3 + granule_ingester/granule_ingester/main.py | 72 +++++++++++++------ .../granule_ingester/writers/S3ObjectStore.py | 32 ++++----- .../granule_ingester/writers/__init__.py | 1 + 4 files changed, 69 insertions(+), 39 deletions(-) diff --git a/granule_ingester/docker/entrypoint.sh b/granule_ingester/docker/entrypoint.sh index 03c0fe3..242d2dc 100644 --- a/granule_ingester/docker/entrypoint.sh +++ b/granule_ingester/docker/entrypoint.sh @@ -7,6 +7,9 @@ python /sdap/granule_ingester/main.py \ $([[ ! -z "$RABBITMQ_USERNAME" ]] && echo --rabbitmq-username=$RABBITMQ_USERNAME) \ $([[ ! -z "$RABBITMQ_PASSWORD" ]] && echo --rabbitmq-password=$RABBITMQ_PASSWORD) \ $([[ ! -z "$RABBITMQ_QUEUE" ]] && echo --rabbitmq-queue=$RABBITMQ_QUEUE) \ + $([[ ! -z "$DATA_STORE" ]] && echo --data-store=$DATA_STORE) \ + $([[ ! -z "$OBJ_STORE_BUCKET" ]] && echo --object-store-bucket=$OBJ_STORE_BUCKET) \ + $([[ ! -z "$OBJ_STORE_REGION" ]] && echo --object-store-region=$OBJ_STORE_REGION) \ $([[ ! -z "$CASSANDRA_CONTACT_POINTS" ]] && echo --cassandra-contact-points=$CASSANDRA_CONTACT_POINTS) \ $([[ ! -z "$CASSANDRA_PORT" ]] && echo --cassandra-port=$CASSANDRA_PORT) \ $([[ ! -z "$CASSANDRA_KEYSPACE" ]] && echo --cassandra-keyspace=$CASSANDRA_KEYSPACE) \ diff --git a/granule_ingester/granule_ingester/main.py b/granule_ingester/granule_ingester/main.py index 9454323..92fd7a7 100644 --- a/granule_ingester/granule_ingester/main.py +++ b/granule_ingester/granule_ingester/main.py @@ -23,10 +23,14 @@ from granule_ingester.consumer import MessageConsumer from granule_ingester.exceptions import FailedHealthCheckError, LostConnectionError from granule_ingester.healthcheck import HealthCheck -from granule_ingester.writers import CassandraStore, SolrStore +from granule_ingester.writers import CassandraStore, SolrStore, S3ObjectStore from granule_ingester.writers.ElasticsearchStore import ElasticsearchStore +def obj_store_factory(bucket, region): + return S3ObjectStore(bucket, region) + + def cassandra_factory(contact_points, port, keyspace, username, password): store = CassandraStore(contact_points=contact_points, port=port, keyspace=keyspace, username=username, password=password) store.connect() @@ -52,6 +56,8 @@ async def run_health_checks(dependencies: List[HealthCheck]): return True +VALID_DATA_STORE = ['OBJECT_STORE', 'CASSANDRA'] + async def main(loop): parser = argparse.ArgumentParser(description='Listen to RabbitMQ for granule ingestion instructions, and process ' 'and ingest a granule for each message that comes through.') @@ -72,7 +78,12 @@ async def main(loop): default="nexus", metavar="QUEUE", help='Name of the RabbitMQ queue to consume from. (Default: "nexus")') - + + # DATA STORE + parser.add_argument('--data-store', + metavar='DATA_STORE', + required=True, + help=f'Which data store to use. {VALID_DATA_STORE}') # CASSANDRA parser.add_argument('--cassandra-contact-points', default=['localhost'], @@ -95,7 +106,15 @@ async def main(loop): metavar="PASSWORD", default=None, help='Cassandra password. Optional.') - + #OBJECT STORE + parser.add_argument('--object-store-bucket', + metavar="OBJECT-STORE-BUCKET", + default=None, + help='OBJECT-STORE-BUCKET. Required if OBJECT_STORE is used') + parser.add_argument('--object-store-region', + metavar="OBJECT-STORE-REGION", + default=None, + help='OBJECT-STORE-REGION. Required if OBJECT_STORE is used.') # METADATA STORE parser.add_argument('--metadata-store', default='solr', @@ -153,7 +172,13 @@ async def main(loop): cassandra_contact_points = args.cassandra_contact_points cassandra_port = args.cassandra_port cassandra_keyspace = args.cassandra_keyspace + obj_store_bucket = args.object_store_bucket + obj_store_region = args.object_store_region + data_store = args.data_store.upper() + if data_store not in VALID_DATA_STORE: + logger.error(f'invalid data store: {data_store} vs. {VALID_DATA_STORE}') + sys.exit(1) metadata_store = args.metadata_store solr_host_and_port = args.solr_host_and_port @@ -176,26 +201,31 @@ async def main(loop): else: metadata_store_obj = ElasticsearchStore(elastic_url, elastic_username, elastic_password, elastic_index) msg_consumer_params['metadata_store_factory'] = partial(elasticsearch_factory, - elastic_url, - elastic_username, - elastic_password, - elastic_index) - # TODO this will also need to check for cassandra vs S3 - msg_consumer_params['data_store_factory'] = partial(cassandra_factory, - cassandra_contact_points, - cassandra_port, - cassandra_keyspace, - cassandra_username, - cassandra_password) + elastic_url, + elastic_username, + elastic_password, + elastic_index) + if data_store == 'CASSANDRA': + msg_consumer_params['data_store_factory'] = partial(cassandra_factory, + cassandra_contact_points, + cassandra_port, + cassandra_keyspace, + cassandra_username, + cassandra_password) + data_store_obj = CassandraStore(cassandra_contact_points, + cassandra_port, + cassandra_keyspace, + cassandra_username, + cassandra_password) + elif data_store == 'OBJECT_STORE': + msg_consumer_params['data_store_factory'] = partial(obj_store_factory, obj_store_bucket, obj_store_region) + data_store_obj = S3ObjectStore(obj_store_bucket, obj_store_region) + else: + logger.error(f'invalid data_store: {data_store} vs. {VALID_DATA_STORE}') + sys.exit(1) consumer = MessageConsumer(**msg_consumer_params) try: - await run_health_checks([CassandraStore(cassandra_contact_points, - cassandra_port, - cassandra_keyspace, - cassandra_username, - cassandra_password), - metadata_store_obj, - consumer]) + await run_health_checks([data_store_obj, metadata_store_obj, consumer]) async with consumer: logger.info("All external dependencies have passed the health checks. Now listening to message queue.") await consumer.start_consuming(args.max_threads) diff --git a/granule_ingester/granule_ingester/writers/S3ObjectStore.py b/granule_ingester/granule_ingester/writers/S3ObjectStore.py index 21c1542..5e988be 100644 --- a/granule_ingester/granule_ingester/writers/S3ObjectStore.py +++ b/granule_ingester/granule_ingester/writers/S3ObjectStore.py @@ -4,7 +4,6 @@ import boto3 from granule_ingester.writers.DataStore import DataStore -from nexusproto import DataTile_pb2 as nexusproto from nexusproto.DataTile_pb2 import TileData, NexusTile logger = logging.getLogger(__name__) @@ -30,23 +29,20 @@ def __init__(self, bucket, region, key=None, secret=None, session=None) -> None: self.__s3_client = boto3.Session(**self.__boto3_session).client('s3') async def health_check(self) -> bool: - try: - response = self.__s3_client.list_objects_v2( - Bucket=self.__bucket, - # Delimiter='string', - # EncodingType='url', - MaxKeys=10, - Prefix='string', - ContinuationToken='string', - FetchOwner=False, - # StartAfter='string', - # RequestPayer='requester', - # ExpectedBucketOwner='string' - ) - # TODO inspect resopnse object - except Exception as e: - return False - return True + response = self.__s3_client.list_objects_v2( + Bucket=self.__bucket, + # Delimiter='string', + # EncodingType='url', + MaxKeys=10, + Prefix='', + # ContinuationToken='string', + FetchOwner=False, + # StartAfter='string', + # RequestPayer='requester', + # ExpectedBucketOwner='string' + ) + + return response['ResponseMetadata']['HTTPStatusCode'] < 200 def save_data(self, nexus_tile: NexusTile) -> None: tile_id = str(UUID(str(nexus_tile.summary.tile_id))) diff --git a/granule_ingester/granule_ingester/writers/__init__.py b/granule_ingester/granule_ingester/writers/__init__.py index 9323d8c..702783e 100644 --- a/granule_ingester/granule_ingester/writers/__init__.py +++ b/granule_ingester/granule_ingester/writers/__init__.py @@ -2,3 +2,4 @@ from granule_ingester.writers.MetadataStore import MetadataStore from granule_ingester.writers.SolrStore import SolrStore from granule_ingester.writers.CassandraStore import CassandraStore +from granule_ingester.writers.S3ObjectStore import S3ObjectStore