Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,7 @@ services:
ETCD_LISTEN_CLIENT_URLS: http://0.0.0.0:4001
ETCD_LISTEN_PEER_URLS: http://0.0.0.0:2380
ETCD_ADVERTISE_CLIENT_URLS: http://127.0.0.1:4001,http://etcd:4001
redis:
image: redis:4
ports:
- 127.0.0.1:6379:6379
130 changes: 130 additions & 0 deletions kqueen/celery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
from celery import Celery, group
from kqueen.models import Cluster, Organization
from kqueen.server import cache, create_app


def make_celery(app):
celery = Celery(app.import_name, backend=app.config['CELERY_RESULT_BACKEND'],
broker=app.config['CELERY_BROKER_URL'])
celery.conf.beat_schedule = app.config.get('CELERY_BEAT_SCHEDULE', {})
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery


flask_app = create_app()
celery = make_celery(flask_app)


class UniqueTask(celery.Task):
LOCK_DURATION = 60
_lock_key = None

def lock(self):
if not self._lock_key:
raise Exception('_lock_key needs to be set on self, before calling lock.')
return cache.set(self._lock_key, True, self.LOCK_DURATION)

def unlock(self):
if not self._lock_key:
raise Exception('_lock_key needs to be set on self, before calling unlock.')
return cache.delete(self._lock_key)

def is_locked(self):
if not self._lock_key:
raise Exception('_lock_key needs to be set on self, before calling is_locked.')
value = cache.get(self._lock_key)
return value if value else False

def run(self, *args, **kwargs):
'''
Test if locked. Lock if not. Do something. Unlock.
'''
self._lock_key = 'unique-task'
if self.is_locked():
return
self.lock()
# DO SOMETHING

def after_return(self, *args, **kwargs):
self.unlock()
return super().after_return(*args, **kwargs)


class ClusterStatus(UniqueTask):
name = 'cluster_status'
ignore_result = True
time_limit = 30

def run(self, namespace, cluster_id):
print('Fetching status of cluster {} ...'.format(cluster_id))
self._lock_key = 'task-cluster-status-{}-lock'.format(cluster_id)
# Test lock
if self.is_locked():
return
# Lock
self.lock()
# Main
status_key = 'task-cluster-status-{}'.format(cluster_id)
cluster = Cluster.load(namespace, cluster_id)
try:
status = cluster.status()
cache.set(status_key, status, 30)
print('Status of cluster {} successfully cached!'.format(cluster_id))
except Exception:
# Invalidate current cache, if we are unable to contact backend
cache.delete(status_key)

celery.tasks.register(ClusterStatus())


class ClusterBackendData(UniqueTask):
name = 'cluster_backend_data'
ignore_result = True
time_limit = 30

def run(self, namespace, cluster_id):
print('Fetching backend data for cluster {} ...'.format(cluster_id))
self._lock_key = 'task-cluster-backend-data-{}-lock'.format(cluster_id)
# Test lock
if self.is_locked():
return
# Lock
self.lock()
# Main
backend_data_key = 'task-cluster-backend-data-{}'.format(cluster_id)
cluster = Cluster.load(namespace, cluster_id)
try:
backend_data = cluster.engine.cluster_get()
cache.set(backend_data_key, backend_data, 30)
print('Backend data for cluster {} successfully cached!'.format(cluster_id))
except Exception:
# Invalidate current cache, if we are unable to contact backend
cache.delete(backend_data_key)

celery.tasks.register(ClusterBackendData())


class UpdateClusters(celery.Task):
name = 'update_clusters'
ignore_result = True

def run(self):
print('Cluster update started ...')
get_backend_data = celery.tasks[ClusterBackendData.name]
get_status = celery.tasks[ClusterStatus.name]
namespaces = [o.namespace for o in Organization.list(None).values()]
for namespace in namespaces:
clusters = [c for c in Cluster.list(namespace).values()]
# Launch and forget
backend_data = group([get_backend_data.s(namespace, c.id) for c in clusters])
backend_data.apply_async()
statuses = group([get_status.s(namespace, c.id) for c in clusters])
statuses.apply_async()

celery.tasks.register(UpdateClusters())
13 changes: 13 additions & 0 deletions kqueen/config/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,19 @@ class BaseConfig:
PROVISIONER_TIMEOUT = 3600
PROMETHEUS_WHITELIST = '127.0.0.0/8'

# Cache config
CACHE_TYPE = 'simple'

# Celery config
CELERY_BROKER_URL = 'redis://localhost:6379'
CELERY_RESULT_BACKEND = 'redis://localhost:6379'
CELERY_BEAT_SCHEDULE = {
'update-cluster-every-10': {
'task': 'update_clusters',
'schedule': 10.0
},
}

@classmethod
def get(cls, name, default=None):
"""Emulate get method from dict"""
Expand Down
7 changes: 7 additions & 0 deletions kqueen/config/dev.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,10 @@ class Config(BaseConfig):
}
JENKINS_USERNAME = None
JENKINS_PASSWORD = None

# Cache config
CACHE_TYPE = 'redis'
CACHE_REDIS_HOST = 'localhost'
CACHE_REDIS_PORT = 6379
CACHE_REDIS_DB = 1
CACHE_REDIS_URL = 'redis://localhost:6379/1'
21 changes: 16 additions & 5 deletions kqueen/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,14 @@ class Cluster(Model, metaclass=ModelMeta):
owner = RelationField(required=True)

def get_state(self):
try:
remote_cluster = self.engine.cluster_get()
except Exception as e:
logger.error('Unable to get data from backend for cluster {}'.format(self.name))
remote_cluster = {}
from kqueen.server import cache
remote_cluster = cache.get('task-cluster-backend-data-{}'.format(self.id))
if not remote_cluster:
try:
remote_cluster = self.engine.cluster_get()
except Exception as e:
logger.error('Unable to get data from backend for cluster {}'.format(self.name))
remote_cluster = {}

if 'state' in remote_cluster:
if remote_cluster['state'] == self.state:
Expand Down Expand Up @@ -96,6 +99,14 @@ def get_kubeconfig(self):

def status(self):
"""Return information about Kubernetes cluster"""
if self.state == config.get('CLUSTER_PROVISIONING_STATE'):
return {}

from kqueen.server import cache
cached_status = cache.get('task-cluster-status-{}'.format(self.id))
if cached_status:
return cached_status

try:
kubernetes = KubernetesAPI(cluster=self)

Expand Down
7 changes: 5 additions & 2 deletions kqueen/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@
from .serializers import KqueenJSONEncoder
from .storages.etcd import EtcdBackend
from flask import Flask
from flask_caching import Cache
from flask_jwt import JWT
from flask_swagger_ui import get_swaggerui_blueprint
from werkzeug.contrib.cache import SimpleCache

import logging

logger = logging.getLogger(__name__)
cache = SimpleCache()
cache = Cache(config=current_config().to_dict())
swagger_url = '/api/docs'
api_url = '/api/v1/swagger'

Expand Down Expand Up @@ -51,6 +51,9 @@ def create_app(config_file=None):
# setup database
app.db = EtcdBackend()

# setup caching
cache.init_app(app)

# setup JWT
JWT(app, authenticate, identity)

Expand Down