From 338156e742c805a4010b7607890fd3a0f62aa655 Mon Sep 17 00:00:00 2001 From: Adam Tengler Date: Wed, 31 Jan 2018 12:23:38 +0100 Subject: [PATCH] Celery + Redis initial --- docker-compose.yml | 4 ++ kqueen/celery.py | 130 ++++++++++++++++++++++++++++++++++++++++++ kqueen/config/base.py | 13 +++++ kqueen/config/dev.py | 7 +++ kqueen/models.py | 21 +++++-- kqueen/server.py | 7 ++- 6 files changed, 175 insertions(+), 7 deletions(-) create mode 100644 kqueen/celery.py diff --git a/docker-compose.yml b/docker-compose.yml index ad66c7f5..d2390218 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -20,3 +20,7 @@ services: ETCD_LISTEN_CLIENT_URLS: http://0.0.0.0:4001 ETCD_LISTEN_PEER_URLS: http://0.0.0.0:2380 ETCD_ADVERTISE_CLIENT_URLS: http://127.0.0.1:4001,http://etcd:4001 + redis: + image: redis:4 + ports: + - 127.0.0.1:6379:6379 diff --git a/kqueen/celery.py b/kqueen/celery.py new file mode 100644 index 00000000..b43f23c2 --- /dev/null +++ b/kqueen/celery.py @@ -0,0 +1,130 @@ +from celery import Celery, group +from kqueen.models import Cluster, Organization +from kqueen.server import cache, create_app + + +def make_celery(app): + celery = Celery(app.import_name, backend=app.config['CELERY_RESULT_BACKEND'], + broker=app.config['CELERY_BROKER_URL']) + celery.conf.beat_schedule = app.config.get('CELERY_BEAT_SCHEDULE', {}) + TaskBase = celery.Task + class ContextTask(TaskBase): + abstract = True + def __call__(self, *args, **kwargs): + with app.app_context(): + return TaskBase.__call__(self, *args, **kwargs) + celery.Task = ContextTask + return celery + + +flask_app = create_app() +celery = make_celery(flask_app) + + +class UniqueTask(celery.Task): + LOCK_DURATION = 60 + _lock_key = None + + def lock(self): + if not self._lock_key: + raise Exception('_lock_key needs to be set on self, before calling lock.') + return cache.set(self._lock_key, True, self.LOCK_DURATION) + + def unlock(self): + if not self._lock_key: + raise Exception('_lock_key needs to be set on self, before calling unlock.') + return cache.delete(self._lock_key) + + def is_locked(self): + if not self._lock_key: + raise Exception('_lock_key needs to be set on self, before calling is_locked.') + value = cache.get(self._lock_key) + return value if value else False + + def run(self, *args, **kwargs): + ''' + Test if locked. Lock if not. Do something. Unlock. + ''' + self._lock_key = 'unique-task' + if self.is_locked(): + return + self.lock() + # DO SOMETHING + + def after_return(self, *args, **kwargs): + self.unlock() + return super().after_return(*args, **kwargs) + + +class ClusterStatus(UniqueTask): + name = 'cluster_status' + ignore_result = True + time_limit = 30 + + def run(self, namespace, cluster_id): + print('Fetching status of cluster {} ...'.format(cluster_id)) + self._lock_key = 'task-cluster-status-{}-lock'.format(cluster_id) + # Test lock + if self.is_locked(): + return + # Lock + self.lock() + # Main + status_key = 'task-cluster-status-{}'.format(cluster_id) + cluster = Cluster.load(namespace, cluster_id) + try: + status = cluster.status() + cache.set(status_key, status, 30) + print('Status of cluster {} successfully cached!'.format(cluster_id)) + except Exception: + # Invalidate current cache, if we are unable to contact backend + cache.delete(status_key) + +celery.tasks.register(ClusterStatus()) + + +class ClusterBackendData(UniqueTask): + name = 'cluster_backend_data' + ignore_result = True + time_limit = 30 + + def run(self, namespace, cluster_id): + print('Fetching backend data for cluster {} ...'.format(cluster_id)) + self._lock_key = 'task-cluster-backend-data-{}-lock'.format(cluster_id) + # Test lock + if self.is_locked(): + return + # Lock + self.lock() + # Main + backend_data_key = 'task-cluster-backend-data-{}'.format(cluster_id) + cluster = Cluster.load(namespace, cluster_id) + try: + backend_data = cluster.engine.cluster_get() + cache.set(backend_data_key, backend_data, 30) + print('Backend data for cluster {} successfully cached!'.format(cluster_id)) + except Exception: + # Invalidate current cache, if we are unable to contact backend + cache.delete(backend_data_key) + +celery.tasks.register(ClusterBackendData()) + + +class UpdateClusters(celery.Task): + name = 'update_clusters' + ignore_result = True + + def run(self): + print('Cluster update started ...') + get_backend_data = celery.tasks[ClusterBackendData.name] + get_status = celery.tasks[ClusterStatus.name] + namespaces = [o.namespace for o in Organization.list(None).values()] + for namespace in namespaces: + clusters = [c for c in Cluster.list(namespace).values()] + # Launch and forget + backend_data = group([get_backend_data.s(namespace, c.id) for c in clusters]) + backend_data.apply_async() + statuses = group([get_status.s(namespace, c.id) for c in clusters]) + statuses.apply_async() + +celery.tasks.register(UpdateClusters()) diff --git a/kqueen/config/base.py b/kqueen/config/base.py index b9328f3c..20648148 100644 --- a/kqueen/config/base.py +++ b/kqueen/config/base.py @@ -44,6 +44,19 @@ class BaseConfig: PROVISIONER_TIMEOUT = 3600 PROMETHEUS_WHITELIST = '127.0.0.0/8' + # Cache config + CACHE_TYPE = 'simple' + + # Celery config + CELERY_BROKER_URL = 'redis://localhost:6379' + CELERY_RESULT_BACKEND = 'redis://localhost:6379' + CELERY_BEAT_SCHEDULE = { + 'update-cluster-every-10': { + 'task': 'update_clusters', + 'schedule': 10.0 + }, + } + @classmethod def get(cls, name, default=None): """Emulate get method from dict""" diff --git a/kqueen/config/dev.py b/kqueen/config/dev.py index f7e4ee81..d410d871 100644 --- a/kqueen/config/dev.py +++ b/kqueen/config/dev.py @@ -22,3 +22,10 @@ class Config(BaseConfig): } JENKINS_USERNAME = None JENKINS_PASSWORD = None + + # Cache config + CACHE_TYPE = 'redis' + CACHE_REDIS_HOST = 'localhost' + CACHE_REDIS_PORT = 6379 + CACHE_REDIS_DB = 1 + CACHE_REDIS_URL = 'redis://localhost:6379/1' diff --git a/kqueen/models.py b/kqueen/models.py index 7d520b0d..475d2902 100644 --- a/kqueen/models.py +++ b/kqueen/models.py @@ -38,11 +38,14 @@ class Cluster(Model, metaclass=ModelMeta): owner = RelationField(required=True) def get_state(self): - try: - remote_cluster = self.engine.cluster_get() - except Exception as e: - logger.error('Unable to get data from backend for cluster {}'.format(self.name)) - remote_cluster = {} + from kqueen.server import cache + remote_cluster = cache.get('task-cluster-backend-data-{}'.format(self.id)) + if not remote_cluster: + try: + remote_cluster = self.engine.cluster_get() + except Exception as e: + logger.error('Unable to get data from backend for cluster {}'.format(self.name)) + remote_cluster = {} if 'state' in remote_cluster: if remote_cluster['state'] == self.state: @@ -96,6 +99,14 @@ def get_kubeconfig(self): def status(self): """Return information about Kubernetes cluster""" + if self.state == config.get('CLUSTER_PROVISIONING_STATE'): + return {} + + from kqueen.server import cache + cached_status = cache.get('task-cluster-status-{}'.format(self.id)) + if cached_status: + return cached_status + try: kubernetes = KubernetesAPI(cluster=self) diff --git a/kqueen/server.py b/kqueen/server.py index a7be46a9..7ce928b3 100644 --- a/kqueen/server.py +++ b/kqueen/server.py @@ -8,14 +8,14 @@ from .serializers import KqueenJSONEncoder from .storages.etcd import EtcdBackend from flask import Flask +from flask_caching import Cache from flask_jwt import JWT from flask_swagger_ui import get_swaggerui_blueprint -from werkzeug.contrib.cache import SimpleCache import logging logger = logging.getLogger(__name__) -cache = SimpleCache() +cache = Cache(config=current_config().to_dict()) swagger_url = '/api/docs' api_url = '/api/v1/swagger' @@ -51,6 +51,9 @@ def create_app(config_file=None): # setup database app.db = EtcdBackend() + # setup caching + cache.init_app(app) + # setup JWT JWT(app, authenticate, identity)