Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions granule_ingester/docker/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ python /sdap/granule_ingester/main.py \
$([[ ! -z "$RABBITMQ_USERNAME" ]] && echo --rabbitmq-username=$RABBITMQ_USERNAME) \
$([[ ! -z "$RABBITMQ_PASSWORD" ]] && echo --rabbitmq-password=$RABBITMQ_PASSWORD) \
$([[ ! -z "$RABBITMQ_QUEUE" ]] && echo --rabbitmq-queue=$RABBITMQ_QUEUE) \
$([[ ! -z "$DATA_STORE" ]] && echo --data-store=$DATA_STORE) \
$([[ ! -z "$OBJ_STORE_BUCKET" ]] && echo --object-store-bucket=$OBJ_STORE_BUCKET) \
$([[ ! -z "$OBJ_STORE_REGION" ]] && echo --object-store-region=$OBJ_STORE_REGION) \
$([[ ! -z "$CASSANDRA_CONTACT_POINTS" ]] && echo --cassandra-contact-points=$CASSANDRA_CONTACT_POINTS) \
$([[ ! -z "$CASSANDRA_PORT" ]] && echo --cassandra-port=$CASSANDRA_PORT) \
$([[ ! -z "$CASSANDRA_KEYSPACE" ]] && echo --cassandra-keyspace=$CASSANDRA_KEYSPACE) \
Expand Down
149 changes: 76 additions & 73 deletions granule_ingester/granule_ingester/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,14 @@
from granule_ingester.consumer import MessageConsumer
from granule_ingester.exceptions import FailedHealthCheckError, LostConnectionError
from granule_ingester.healthcheck import HealthCheck
from granule_ingester.writers import CassandraStore, SolrStore
from granule_ingester.writers import CassandraStore, SolrStore, S3ObjectStore
from granule_ingester.writers.ElasticsearchStore import ElasticsearchStore


def obj_store_factory(bucket, region):
return S3ObjectStore(bucket, region)


def cassandra_factory(contact_points, port, keyspace, username, password):
store = CassandraStore(contact_points=contact_points, port=port, keyspace=keyspace, username=username, password=password)
store.connect()
Expand All @@ -52,6 +56,8 @@ async def run_health_checks(dependencies: List[HealthCheck]):
return True


VALID_DATA_STORE = ['OBJECT_STORE', 'CASSANDRA']

async def main(loop):
parser = argparse.ArgumentParser(description='Listen to RabbitMQ for granule ingestion instructions, and process '
'and ingest a granule for each message that comes through.')
Expand All @@ -72,7 +78,12 @@ async def main(loop):
default="nexus",
metavar="QUEUE",
help='Name of the RabbitMQ queue to consume from. (Default: "nexus")')


# DATA STORE
parser.add_argument('--data-store',
metavar='DATA_STORE',
required=True,
help=f'Which data store to use. {VALID_DATA_STORE}')
# CASSANDRA
parser.add_argument('--cassandra-contact-points',
default=['localhost'],
Expand All @@ -95,7 +106,15 @@ async def main(loop):
metavar="PASSWORD",
default=None,
help='Cassandra password. Optional.')

#OBJECT STORE
parser.add_argument('--object-store-bucket',
metavar="OBJECT-STORE-BUCKET",
default=None,
help='OBJECT-STORE-BUCKET. Required if OBJECT_STORE is used')
parser.add_argument('--object-store-region',
metavar="OBJECT-STORE-REGION",
default=None,
help='OBJECT-STORE-REGION. Required if OBJECT_STORE is used.')
# METADATA STORE
parser.add_argument('--metadata-store',
default='solr',
Expand Down Expand Up @@ -153,7 +172,13 @@ async def main(loop):
cassandra_contact_points = args.cassandra_contact_points
cassandra_port = args.cassandra_port
cassandra_keyspace = args.cassandra_keyspace
obj_store_bucket = args.object_store_bucket
obj_store_region = args.object_store_region

data_store = args.data_store.upper()
if data_store not in VALID_DATA_STORE:
logger.error(f'invalid data store: {data_store} vs. {VALID_DATA_STORE}')
sys.exit(1)
metadata_store = args.metadata_store

solr_host_and_port = args.solr_host_and_port
Expand All @@ -162,78 +187,56 @@ async def main(loop):
elastic_url = args.elastic_url
elastic_username = args.elastic_username
elastic_password = args.elastic_password
elastic_index = args.elastic_index

elastic_index = args.elastic_index

msg_consumer_params = {
'rabbitmq_host': args.rabbitmq_host,
'rabbitmq_username': args.rabbitmq_username,
'rabbitmq_password': args.rabbitmq_password,
'rabbitmq_queue': args.rabbitmq_queue,
}
if metadata_store == 'solr':
consumer = MessageConsumer(rabbitmq_host=args.rabbitmq_host,
rabbitmq_username=args.rabbitmq_username,
rabbitmq_password=args.rabbitmq_password,
rabbitmq_queue=args.rabbitmq_queue,
data_store_factory=partial(cassandra_factory,
cassandra_contact_points,
cassandra_port,
cassandra_keyspace,
cassandra_username,
cassandra_password),
metadata_store_factory=partial(solr_factory, solr_host_and_port, zk_host_and_port))
try:
solr_store = SolrStore(zk_url=zk_host_and_port) if zk_host_and_port else SolrStore(solr_url=solr_host_and_port)
await run_health_checks([CassandraStore(cassandra_contact_points,
cassandra_port,
cassandra_keyspace,
cassandra_username,
cassandra_password),
solr_store,
consumer])
async with consumer:
logger.info("All external dependencies have passed the health checks. Now listening to message queue.")
await consumer.start_consuming(args.max_threads)
except FailedHealthCheckError as e:
logger.error(f"Quitting because not all dependencies passed the health checks: {e}")
except LostConnectionError as e:
logger.error(f"{e} Any messages that were being processed have been re-queued. Quitting.")
except Exception as e:
logger.exception(f"Shutting down because of an unrecoverable error:\n{e}")
finally:
sys.exit(1)

metadata_store_obj = SolrStore(zk_url=zk_host_and_port) if zk_host_and_port else SolrStore(solr_url=solr_host_and_port)
msg_consumer_params['metadata_store_factory'] = partial(solr_factory, solr_host_and_port, zk_host_and_port)
else:
metadata_store_obj = ElasticsearchStore(elastic_url, elastic_username, elastic_password, elastic_index)
msg_consumer_params['metadata_store_factory'] = partial(elasticsearch_factory,
elastic_url,
elastic_username,
elastic_password,
elastic_index)
if data_store == 'CASSANDRA':
msg_consumer_params['data_store_factory'] = partial(cassandra_factory,
cassandra_contact_points,
cassandra_port,
cassandra_keyspace,
cassandra_username,
cassandra_password)
data_store_obj = CassandraStore(cassandra_contact_points,
cassandra_port,
cassandra_keyspace,
cassandra_username,
cassandra_password)
elif data_store == 'OBJECT_STORE':
msg_consumer_params['data_store_factory'] = partial(obj_store_factory, obj_store_bucket, obj_store_region)
data_store_obj = S3ObjectStore(obj_store_bucket, obj_store_region)
else:
consumer = MessageConsumer(rabbitmq_host=args.rabbitmq_host,
rabbitmq_username=args.rabbitmq_username,
rabbitmq_password=args.rabbitmq_password,
rabbitmq_queue=args.rabbitmq_queue,
data_store_factory=partial(cassandra_factory,
cassandra_contact_points,
cassandra_port,
cassandra_keyspace,
cassandra_username,
cassandra_password),
metadata_store_factory=partial(elasticsearch_factory,
elastic_url,
elastic_username,
elastic_password,
elastic_index))
try:
es_store = ElasticsearchStore(elastic_url, elastic_username, elastic_password, elastic_index)
await run_health_checks([CassandraStore(cassandra_contact_points,
cassandra_port,
cassandra_keyspace,
cassandra_username,
cassandra_password),
es_store,
consumer])

async with consumer:
logger.info("All external dependencies have passed the health checks. Now listening to message queue.")
await consumer.start_consuming(args.max_threads)
except FailedHealthCheckError as e:
logger.error(f"Quitting because not all dependencies passed the health checks: {e}")
except LostConnectionError as e:
logger.error(f"{e} Any messages that were being processed have been re-queued. Quitting.")
except Exception as e:
logger.exception(f"Shutting down because of an unrecoverable error:\n{e}")
finally:
sys.exit(1)
logger.error(f'invalid data_store: {data_store} vs. {VALID_DATA_STORE}')
sys.exit(1)
consumer = MessageConsumer(**msg_consumer_params)
try:
await run_health_checks([data_store_obj, metadata_store_obj, consumer])
async with consumer:
logger.info("All external dependencies have passed the health checks. Now listening to message queue.")
await consumer.start_consuming(args.max_threads)
except FailedHealthCheckError as e:
logger.error(f"Quitting because not all dependencies passed the health checks: {e}")
except LostConnectionError as e:
logger.error(f"{e} Any messages that were being processed have been re-queued. Quitting.")
except Exception as e:
logger.exception(f"Shutting down because of an unrecoverable error:\n{e}")
finally:
sys.exit(1)


if __name__ == '__main__':
Expand Down
53 changes: 53 additions & 0 deletions granule_ingester/granule_ingester/writers/S3ObjectStore.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import logging
from io import BytesIO
from uuid import UUID

import boto3
from granule_ingester.writers.DataStore import DataStore
from nexusproto.DataTile_pb2 import TileData, NexusTile

logger = logging.getLogger(__name__)


class S3ObjectStore(DataStore):
"""
Should be able to use for AWS-S3 and Ceph Object Store
"""

def __init__(self, bucket, region, key=None, secret=None, session=None) -> None:
super().__init__()
self.__bucket = bucket
self.__boto3_session = {
'region_name': region,
}
if key is not None:
self.__boto3_session['aws_access_key_id'] = key
if secret is not None:
self.__boto3_session['aws_secret_access_key'] = secret
if session is not None:
self.__boto3_session['aws_session_token'] = session
self.__s3_client = boto3.Session(**self.__boto3_session).client('s3')

async def health_check(self) -> bool:
response = self.__s3_client.list_objects_v2(
Bucket=self.__bucket,
# Delimiter='string',
# EncodingType='url',
MaxKeys=10,
Prefix='',
# ContinuationToken='string',
FetchOwner=False,
# StartAfter='string',
# RequestPayer='requester',
# ExpectedBucketOwner='string'
)

return response['ResponseMetadata']['HTTPStatusCode'] < 200

def save_data(self, nexus_tile: NexusTile) -> None:
tile_id = str(UUID(str(nexus_tile.summary.tile_id)))
logger.debug(f'saving data {tile_id}')
serialized_tile_data = TileData.SerializeToString(nexus_tile.tile)
logger.debug(f'uploading to object store')
self.__s3_client.upload_fileobj(BytesIO(bytearray(serialized_tile_data)), self.__bucket, f'{tile_id}')
return
1 change: 1 addition & 0 deletions granule_ingester/granule_ingester/writers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
from granule_ingester.writers.MetadataStore import MetadataStore
from granule_ingester.writers.SolrStore import SolrStore
from granule_ingester.writers.CassandraStore import CassandraStore
from granule_ingester.writers.S3ObjectStore import S3ObjectStore