-
Notifications
You must be signed in to change notification settings - Fork 1
Add proper server-side query support for instances #26
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,20 +1,22 @@ | ||
| """ | ||
| Redis instance domain model and storage helpers. | ||
|
|
||
| TODO: Use a better persistence structure than serializing a list of JSON | ||
| objects into a string. | ||
| Instances are stored as Redis Hash documents with a RediSearch index for | ||
| efficient querying by environment, usage, status, instance_type, and user_id. | ||
| """ | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import json | ||
| import logging | ||
| from dataclasses import dataclass | ||
| from datetime import datetime, timezone | ||
| from enum import Enum | ||
| from typing import Any, Dict, List, Optional | ||
|
|
||
| from pydantic import BaseModel, Field, SecretStr, field_serializer, field_validator | ||
| from redisvl.query import CountQuery, FilterQuery | ||
| from redisvl.query.filter import FilterExpression, Tag | ||
|
|
||
| from .encryption import encrypt_secret, get_secret_value | ||
| from .keys import RedisKeys | ||
|
|
@@ -348,6 +350,132 @@ async def get_instances() -> List[RedisInstance]: | |
| return [] | ||
|
|
||
|
|
||
| @dataclass | ||
| class InstanceQueryResult: | ||
| """Result of an instance query with pagination info.""" | ||
|
|
||
| instances: List[RedisInstance] | ||
| total: int | ||
| limit: int | ||
| offset: int | ||
|
|
||
|
|
||
| async def query_instances( | ||
| *, | ||
| environment: Optional[str] = None, | ||
| usage: Optional[str] = None, | ||
| status: Optional[str] = None, | ||
| instance_type: Optional[str] = None, | ||
| user_id: Optional[str] = None, | ||
| search: Optional[str] = None, | ||
| limit: int = 100, | ||
| offset: int = 0, | ||
| ) -> InstanceQueryResult: | ||
| """Query instances with server-side filtering and pagination. | ||
|
|
||
| Args: | ||
| environment: Filter by environment (development, staging, production) | ||
| usage: Filter by usage type (cache, analytics, session, queue, custom) | ||
| status: Filter by status (healthy, unhealthy, unknown) | ||
| instance_type: Filter by type (oss_single, oss_cluster, redis_enterprise, redis_cloud) | ||
| user_id: Filter by user ID | ||
| search: Text search on instance name | ||
| limit: Maximum number of results (default 100, max 1000) | ||
| offset: Number of results to skip for pagination | ||
|
|
||
| Returns: | ||
| InstanceQueryResult with instances list, total count, and pagination info | ||
| """ | ||
| try: | ||
| await _ensure_instances_index_exists() | ||
| index = await get_instances_index() | ||
|
|
||
| # Build filter expression from provided parameters | ||
| filter_expr = None | ||
|
|
||
| if environment: | ||
| env_filter = Tag("environment") == environment.lower() | ||
| filter_expr = env_filter if filter_expr is None else (filter_expr & env_filter) | ||
|
|
||
| if usage: | ||
| usage_filter = Tag("usage") == usage.lower() | ||
| filter_expr = usage_filter if filter_expr is None else (filter_expr & usage_filter) | ||
|
|
||
| if status: | ||
| status_filter = Tag("status") == status.lower() | ||
| filter_expr = status_filter if filter_expr is None else (filter_expr & status_filter) | ||
|
|
||
| if instance_type: | ||
| type_filter = Tag("instance_type") == instance_type.lower() | ||
| filter_expr = type_filter if filter_expr is None else (filter_expr & type_filter) | ||
|
|
||
| if user_id: | ||
| user_filter = Tag("user_id") == user_id | ||
| filter_expr = user_filter if filter_expr is None else (filter_expr & user_filter) | ||
|
|
||
| if search: | ||
| # Use raw FilterExpression for wildcard matching (Tag escapes wildcards) | ||
| name_filter = FilterExpression(f"@name:{{*{search}*}}") | ||
| filter_expr = name_filter if filter_expr is None else (filter_expr & name_filter) | ||
|
|
||
| # Get total count with filter | ||
| count_expr = filter_expr if filter_expr is not None else "*" | ||
| try: | ||
| total = await index.query(CountQuery(filter_expression=count_expr)) | ||
| total = int(total) if isinstance(total, int) else 0 | ||
| except Exception: | ||
| total = 0 | ||
|
|
||
| if total == 0: | ||
| return InstanceQueryResult(instances=[], total=0, limit=limit, offset=offset) | ||
|
|
||
| # Build query with pagination | ||
| # Clamp limit to reasonable bounds | ||
| limit = max(1, min(limit, 1000)) | ||
| offset = max(0, offset) | ||
|
|
||
| fq = FilterQuery( | ||
| return_fields=["data"], | ||
| num_results=limit, | ||
| ).sort_by("updated_at", asc=False) | ||
|
|
||
| if filter_expr is not None: | ||
| fq.set_filter(filter_expr) | ||
|
|
||
| fq.paging(offset, limit) | ||
|
Comment on lines
+437
to
+445
|
||
|
|
||
| results = await index.query(fq) | ||
|
|
||
| # Parse results | ||
| instances: List[RedisInstance] = [] | ||
| for doc in results or []: | ||
| try: | ||
| raw = doc.get("data") | ||
| if not raw: | ||
| continue | ||
| if isinstance(raw, bytes): | ||
| raw = raw.decode("utf-8") | ||
| inst_data = json.loads(raw) | ||
| if inst_data.get("connection_url"): | ||
| inst_data["connection_url"] = get_secret_value(inst_data["connection_url"]) | ||
| if inst_data.get("admin_password"): | ||
| inst_data["admin_password"] = get_secret_value(inst_data["admin_password"]) | ||
| instances.append(RedisInstance(**inst_data)) | ||
| except Exception as e: | ||
| logger.error("Failed to load instance from query result: %s. Skipping.", e) | ||
|
|
||
| return InstanceQueryResult( | ||
| instances=instances, | ||
| total=total, | ||
| limit=limit, | ||
| offset=offset, | ||
| ) | ||
|
|
||
| except Exception as e: | ||
| logger.error("Failed to query instances: %s", e) | ||
| return InstanceQueryResult(instances=[], total=0, limit=limit, offset=offset) | ||
|
|
||
|
|
||
| # --- Instances search index helpers (non-breaking integration) --- | ||
| async def _ensure_instances_index_exists() -> None: | ||
| try: | ||
|
|
@@ -622,23 +750,81 @@ async def create_instance( | |
|
|
||
| # Convenience lookups | ||
| async def get_instance_by_id(instance_id: str) -> Optional[RedisInstance]: | ||
| for inst in await get_instances(): | ||
| if inst.id == instance_id: | ||
| return inst | ||
| return None | ||
| """Get a single instance by ID using direct key lookup. | ||
|
|
||
| This is more efficient than get_instances() when you only need one instance, | ||
| as it does a direct HGETALL on the instance key instead of searching. | ||
| """ | ||
| try: | ||
| client = get_redis_client() | ||
| key = f"{SRE_INSTANCES_INDEX}:{instance_id}" | ||
| data = await client.hget(key, "data") | ||
|
|
||
| if not data: | ||
| return None | ||
|
|
||
| if isinstance(data, bytes): | ||
| data = data.decode("utf-8") | ||
|
|
||
| inst_data = json.loads(data) | ||
| if inst_data.get("connection_url"): | ||
| inst_data["connection_url"] = get_secret_value(inst_data["connection_url"]) | ||
| if inst_data.get("admin_password"): | ||
| inst_data["admin_password"] = get_secret_value(inst_data["admin_password"]) | ||
|
|
||
| return RedisInstance(**inst_data) | ||
| except Exception as e: | ||
| logger.error("Failed to get instance by ID %s: %s", instance_id, e) | ||
| return None | ||
|
|
||
|
|
||
| async def get_instance_by_name(instance_name: str) -> Optional[RedisInstance]: | ||
| for inst in await get_instances(): | ||
| if inst.name == instance_name: | ||
| return inst | ||
| return None | ||
| """Get a single instance by name using index query. | ||
|
|
||
| Uses RediSearch text search on the name field for efficient lookup. | ||
| """ | ||
| try: | ||
| await _ensure_instances_index_exists() | ||
| index = await get_instances_index() | ||
|
|
||
| # Use exact tag match on name field | ||
| fq = FilterQuery( | ||
| return_fields=["data"], | ||
| num_results=1, | ||
| ) | ||
| fq.set_filter(Tag("name") == instance_name) | ||
|
|
||
| results = await index.query(fq) | ||
|
|
||
| if not results: | ||
| return None | ||
|
|
||
| doc = results[0] | ||
| raw = doc.get("data") | ||
| if not raw: | ||
| return None | ||
|
|
||
| if isinstance(raw, bytes): | ||
| raw = raw.decode("utf-8") | ||
|
|
||
| inst_data = json.loads(raw) | ||
| if inst_data.get("connection_url"): | ||
| inst_data["connection_url"] = get_secret_value(inst_data["connection_url"]) | ||
| if inst_data.get("admin_password"): | ||
| inst_data["admin_password"] = get_secret_value(inst_data["admin_password"]) | ||
|
|
||
| return RedisInstance(**inst_data) | ||
| except Exception as e: | ||
| logger.error("Failed to get instance by name %s: %s", instance_name, e) | ||
| return None | ||
|
|
||
|
|
||
| async def get_instance_map() -> Dict[str, RedisInstance]: | ||
| """Get all instances as a dict keyed by instance ID.""" | ||
| return {inst.id: inst for inst in await get_instances()} | ||
|
|
||
|
|
||
| async def get_instance_name(instance_id: str) -> Optional[str]: | ||
| """Get just the name of an instance by ID.""" | ||
| inst = await get_instance_by_id(instance_id) | ||
| return inst.name if inst else None | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new search parameter is not covered by tests. Consider adding test cases for the search functionality to verify text search behavior works as expected, especially for partial matches and edge cases like empty strings or special characters.