|
1 | 1 | """ |
2 | 2 | Redis instance domain model and storage helpers. |
3 | 3 |
|
4 | | -TODO: Use a better persistence structure than serializing a list of JSON |
5 | | - objects into a string. |
| 4 | +Instances are stored as Redis Hash documents with a RediSearch index for |
| 5 | +efficient querying by environment, usage, status, instance_type, and user_id. |
6 | 6 | """ |
7 | 7 |
|
8 | 8 | from __future__ import annotations |
9 | 9 |
|
10 | 10 | import json |
11 | 11 | import logging |
| 12 | +from dataclasses import dataclass |
12 | 13 | from datetime import datetime, timezone |
13 | 14 | from enum import Enum |
14 | 15 | from typing import Any, Dict, List, Optional |
15 | 16 |
|
16 | 17 | from pydantic import BaseModel, Field, SecretStr, field_serializer, field_validator |
17 | 18 | from redisvl.query import CountQuery, FilterQuery |
| 19 | +from redisvl.query.filter import FilterExpression, Tag |
18 | 20 |
|
19 | 21 | from .encryption import encrypt_secret, get_secret_value |
20 | 22 | from .keys import RedisKeys |
@@ -348,6 +350,132 @@ async def get_instances() -> List[RedisInstance]: |
348 | 350 | return [] |
349 | 351 |
|
350 | 352 |
|
| 353 | +@dataclass |
| 354 | +class InstanceQueryResult: |
| 355 | + """Result of an instance query with pagination info.""" |
| 356 | + |
| 357 | + instances: List[RedisInstance] |
| 358 | + total: int |
| 359 | + limit: int |
| 360 | + offset: int |
| 361 | + |
| 362 | + |
| 363 | +async def query_instances( |
| 364 | + *, |
| 365 | + environment: Optional[str] = None, |
| 366 | + usage: Optional[str] = None, |
| 367 | + status: Optional[str] = None, |
| 368 | + instance_type: Optional[str] = None, |
| 369 | + user_id: Optional[str] = None, |
| 370 | + search: Optional[str] = None, |
| 371 | + limit: int = 100, |
| 372 | + offset: int = 0, |
| 373 | +) -> InstanceQueryResult: |
| 374 | + """Query instances with server-side filtering and pagination. |
| 375 | +
|
| 376 | + Args: |
| 377 | + environment: Filter by environment (development, staging, production) |
| 378 | + usage: Filter by usage type (cache, analytics, session, queue, custom) |
| 379 | + status: Filter by status (healthy, unhealthy, unknown) |
| 380 | + instance_type: Filter by type (oss_single, oss_cluster, redis_enterprise, redis_cloud) |
| 381 | + user_id: Filter by user ID |
| 382 | + search: Text search on instance name |
| 383 | + limit: Maximum number of results (default 100, max 1000) |
| 384 | + offset: Number of results to skip for pagination |
| 385 | +
|
| 386 | + Returns: |
| 387 | + InstanceQueryResult with instances list, total count, and pagination info |
| 388 | + """ |
| 389 | + try: |
| 390 | + await _ensure_instances_index_exists() |
| 391 | + index = await get_instances_index() |
| 392 | + |
| 393 | + # Build filter expression from provided parameters |
| 394 | + filter_expr = None |
| 395 | + |
| 396 | + if environment: |
| 397 | + env_filter = Tag("environment") == environment.lower() |
| 398 | + filter_expr = env_filter if filter_expr is None else (filter_expr & env_filter) |
| 399 | + |
| 400 | + if usage: |
| 401 | + usage_filter = Tag("usage") == usage.lower() |
| 402 | + filter_expr = usage_filter if filter_expr is None else (filter_expr & usage_filter) |
| 403 | + |
| 404 | + if status: |
| 405 | + status_filter = Tag("status") == status.lower() |
| 406 | + filter_expr = status_filter if filter_expr is None else (filter_expr & status_filter) |
| 407 | + |
| 408 | + if instance_type: |
| 409 | + type_filter = Tag("instance_type") == instance_type.lower() |
| 410 | + filter_expr = type_filter if filter_expr is None else (filter_expr & type_filter) |
| 411 | + |
| 412 | + if user_id: |
| 413 | + user_filter = Tag("user_id") == user_id |
| 414 | + filter_expr = user_filter if filter_expr is None else (filter_expr & user_filter) |
| 415 | + |
| 416 | + if search: |
| 417 | + # Use raw FilterExpression for wildcard matching (Tag escapes wildcards) |
| 418 | + name_filter = FilterExpression(f"@name:{{*{search}*}}") |
| 419 | + filter_expr = name_filter if filter_expr is None else (filter_expr & name_filter) |
| 420 | + |
| 421 | + # Get total count with filter |
| 422 | + count_expr = filter_expr if filter_expr is not None else "*" |
| 423 | + try: |
| 424 | + total = await index.query(CountQuery(filter_expression=count_expr)) |
| 425 | + total = int(total) if isinstance(total, int) else 0 |
| 426 | + except Exception: |
| 427 | + total = 0 |
| 428 | + |
| 429 | + if total == 0: |
| 430 | + return InstanceQueryResult(instances=[], total=0, limit=limit, offset=offset) |
| 431 | + |
| 432 | + # Build query with pagination |
| 433 | + # Clamp limit to reasonable bounds |
| 434 | + limit = max(1, min(limit, 1000)) |
| 435 | + offset = max(0, offset) |
| 436 | + |
| 437 | + fq = FilterQuery( |
| 438 | + return_fields=["data"], |
| 439 | + num_results=limit, |
| 440 | + ).sort_by("updated_at", asc=False) |
| 441 | + |
| 442 | + if filter_expr is not None: |
| 443 | + fq.set_filter(filter_expr) |
| 444 | + |
| 445 | + fq.paging(offset, limit) |
| 446 | + |
| 447 | + results = await index.query(fq) |
| 448 | + |
| 449 | + # Parse results |
| 450 | + instances: List[RedisInstance] = [] |
| 451 | + for doc in results or []: |
| 452 | + try: |
| 453 | + raw = doc.get("data") |
| 454 | + if not raw: |
| 455 | + continue |
| 456 | + if isinstance(raw, bytes): |
| 457 | + raw = raw.decode("utf-8") |
| 458 | + inst_data = json.loads(raw) |
| 459 | + if inst_data.get("connection_url"): |
| 460 | + inst_data["connection_url"] = get_secret_value(inst_data["connection_url"]) |
| 461 | + if inst_data.get("admin_password"): |
| 462 | + inst_data["admin_password"] = get_secret_value(inst_data["admin_password"]) |
| 463 | + instances.append(RedisInstance(**inst_data)) |
| 464 | + except Exception as e: |
| 465 | + logger.error("Failed to load instance from query result: %s. Skipping.", e) |
| 466 | + |
| 467 | + return InstanceQueryResult( |
| 468 | + instances=instances, |
| 469 | + total=total, |
| 470 | + limit=limit, |
| 471 | + offset=offset, |
| 472 | + ) |
| 473 | + |
| 474 | + except Exception as e: |
| 475 | + logger.error("Failed to query instances: %s", e) |
| 476 | + return InstanceQueryResult(instances=[], total=0, limit=limit, offset=offset) |
| 477 | + |
| 478 | + |
351 | 479 | # --- Instances search index helpers (non-breaking integration) --- |
352 | 480 | async def _ensure_instances_index_exists() -> None: |
353 | 481 | try: |
@@ -622,23 +750,81 @@ async def create_instance( |
622 | 750 |
|
623 | 751 | # Convenience lookups |
624 | 752 | async def get_instance_by_id(instance_id: str) -> Optional[RedisInstance]: |
625 | | - for inst in await get_instances(): |
626 | | - if inst.id == instance_id: |
627 | | - return inst |
628 | | - return None |
| 753 | + """Get a single instance by ID using direct key lookup. |
| 754 | +
|
| 755 | + This is more efficient than get_instances() when you only need one instance, |
| 756 | + as it does a direct HGETALL on the instance key instead of searching. |
| 757 | + """ |
| 758 | + try: |
| 759 | + client = get_redis_client() |
| 760 | + key = f"{SRE_INSTANCES_INDEX}:{instance_id}" |
| 761 | + data = await client.hget(key, "data") |
| 762 | + |
| 763 | + if not data: |
| 764 | + return None |
| 765 | + |
| 766 | + if isinstance(data, bytes): |
| 767 | + data = data.decode("utf-8") |
| 768 | + |
| 769 | + inst_data = json.loads(data) |
| 770 | + if inst_data.get("connection_url"): |
| 771 | + inst_data["connection_url"] = get_secret_value(inst_data["connection_url"]) |
| 772 | + if inst_data.get("admin_password"): |
| 773 | + inst_data["admin_password"] = get_secret_value(inst_data["admin_password"]) |
| 774 | + |
| 775 | + return RedisInstance(**inst_data) |
| 776 | + except Exception as e: |
| 777 | + logger.error("Failed to get instance by ID %s: %s", instance_id, e) |
| 778 | + return None |
629 | 779 |
|
630 | 780 |
|
631 | 781 | async def get_instance_by_name(instance_name: str) -> Optional[RedisInstance]: |
632 | | - for inst in await get_instances(): |
633 | | - if inst.name == instance_name: |
634 | | - return inst |
635 | | - return None |
| 782 | + """Get a single instance by name using index query. |
| 783 | +
|
| 784 | + Uses RediSearch text search on the name field for efficient lookup. |
| 785 | + """ |
| 786 | + try: |
| 787 | + await _ensure_instances_index_exists() |
| 788 | + index = await get_instances_index() |
| 789 | + |
| 790 | + # Use exact tag match on name field |
| 791 | + fq = FilterQuery( |
| 792 | + return_fields=["data"], |
| 793 | + num_results=1, |
| 794 | + ) |
| 795 | + fq.set_filter(Tag("name") == instance_name) |
| 796 | + |
| 797 | + results = await index.query(fq) |
| 798 | + |
| 799 | + if not results: |
| 800 | + return None |
| 801 | + |
| 802 | + doc = results[0] |
| 803 | + raw = doc.get("data") |
| 804 | + if not raw: |
| 805 | + return None |
| 806 | + |
| 807 | + if isinstance(raw, bytes): |
| 808 | + raw = raw.decode("utf-8") |
| 809 | + |
| 810 | + inst_data = json.loads(raw) |
| 811 | + if inst_data.get("connection_url"): |
| 812 | + inst_data["connection_url"] = get_secret_value(inst_data["connection_url"]) |
| 813 | + if inst_data.get("admin_password"): |
| 814 | + inst_data["admin_password"] = get_secret_value(inst_data["admin_password"]) |
| 815 | + |
| 816 | + return RedisInstance(**inst_data) |
| 817 | + except Exception as e: |
| 818 | + logger.error("Failed to get instance by name %s: %s", instance_name, e) |
| 819 | + return None |
636 | 820 |
|
637 | 821 |
|
638 | 822 | async def get_instance_map() -> Dict[str, RedisInstance]: |
| 823 | + """Get all instances as a dict keyed by instance ID.""" |
639 | 824 | return {inst.id: inst for inst in await get_instances()} |
640 | 825 |
|
641 | 826 |
|
642 | 827 | async def get_instance_name(instance_id: str) -> Optional[str]: |
| 828 | + """Get just the name of an instance by ID.""" |
643 | 829 | inst = await get_instance_by_id(instance_id) |
644 | 830 | return inst.name if inst else None |
0 commit comments