Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 51 additions & 6 deletions redis_sre_agent/api/instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from typing import List, Optional
from urllib.parse import urlparse

from fastapi import APIRouter, HTTPException
from fastapi import APIRouter, HTTPException, Query
from pydantic import BaseModel, Field, SecretStr, field_serializer, field_validator

from redis_sre_agent.core import instances as core_instances
Expand Down Expand Up @@ -93,6 +93,15 @@ class RedisInstanceResponse(BaseModel):
redis_cloud_database_name: Optional[str] = None


class InstanceListResponse(BaseModel):
"""Response model for paginated instance list with filtering."""

instances: List[RedisInstanceResponse]
total: int = Field(..., description="Total number of instances matching the filter")
limit: int = Field(..., description="Maximum number of results returned")
offset: int = Field(..., description="Number of results skipped")


class CreateInstanceRequest(BaseModel):
"""Request model for creating a Redis instance."""

Expand Down Expand Up @@ -268,12 +277,48 @@ def dump_secret(self, v):
connections: Optional[int] = None


@router.get("/instances", response_model=List[RedisInstanceResponse])
async def list_instances():
"""List all Redis instances with masked credentials."""
@router.get("/instances", response_model=InstanceListResponse)
async def list_instances(
environment: Optional[str] = Query(
None, description="Filter by environment (development, staging, production)"
),
usage: Optional[str] = Query(
None, description="Filter by usage type (cache, analytics, session, queue, custom)"
),
status: Optional[str] = Query(
None, description="Filter by status (healthy, unhealthy, unknown)"
),
instance_type: Optional[str] = Query(
None,
description="Filter by type (oss_single, oss_cluster, redis_enterprise, redis_cloud)",
),
user_id: Optional[str] = Query(None, description="Filter by user ID"),
search: Optional[str] = Query(None, description="Search by instance name"),
Copy link

Copilot AI Dec 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new search parameter is not covered by tests. Consider adding test cases for the search functionality to verify text search behavior works as expected, especially for partial matches and edge cases like empty strings or special characters.

Copilot uses AI. Check for mistakes.
limit: int = Query(100, ge=1, le=1000, description="Maximum number of results"),
offset: int = Query(0, ge=0, description="Number of results to skip for pagination"),
):
"""List Redis instances with server-side filtering and pagination.

All filters are optional. When no filters are provided, returns all instances.
Results are sorted by updated_at descending (most recently updated first).
"""
try:
instances = await core_instances.get_instances()
return [to_response(inst) for inst in instances]
result = await core_instances.query_instances(
environment=environment,
usage=usage,
status=status,
instance_type=instance_type,
user_id=user_id,
search=search,
limit=limit,
offset=offset,
)
return InstanceListResponse(
instances=[to_response(inst) for inst in result.instances],
total=result.total,
limit=result.limit,
offset=result.offset,
)
except Exception as e:
logger.error(f"Failed to list instances: {e}")
raise HTTPException(status_code=500, detail="Failed to retrieve instances")
Expand Down
206 changes: 196 additions & 10 deletions redis_sre_agent/core/instances.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
"""
Redis instance domain model and storage helpers.

TODO: Use a better persistence structure than serializing a list of JSON
objects into a string.
Instances are stored as Redis Hash documents with a RediSearch index for
efficient querying by environment, usage, status, instance_type, and user_id.
"""

from __future__ import annotations

import json
import logging
from dataclasses import dataclass
from datetime import datetime, timezone
from enum import Enum
from typing import Any, Dict, List, Optional

from pydantic import BaseModel, Field, SecretStr, field_serializer, field_validator
from redisvl.query import CountQuery, FilterQuery
from redisvl.query.filter import FilterExpression, Tag

from .encryption import encrypt_secret, get_secret_value
from .keys import RedisKeys
Expand Down Expand Up @@ -348,6 +350,132 @@ async def get_instances() -> List[RedisInstance]:
return []


@dataclass
class InstanceQueryResult:
"""Result of an instance query with pagination info."""

instances: List[RedisInstance]
total: int
limit: int
offset: int


async def query_instances(
*,
environment: Optional[str] = None,
usage: Optional[str] = None,
status: Optional[str] = None,
instance_type: Optional[str] = None,
user_id: Optional[str] = None,
search: Optional[str] = None,
limit: int = 100,
offset: int = 0,
) -> InstanceQueryResult:
"""Query instances with server-side filtering and pagination.

Args:
environment: Filter by environment (development, staging, production)
usage: Filter by usage type (cache, analytics, session, queue, custom)
status: Filter by status (healthy, unhealthy, unknown)
instance_type: Filter by type (oss_single, oss_cluster, redis_enterprise, redis_cloud)
user_id: Filter by user ID
search: Text search on instance name
limit: Maximum number of results (default 100, max 1000)
offset: Number of results to skip for pagination

Returns:
InstanceQueryResult with instances list, total count, and pagination info
"""
try:
await _ensure_instances_index_exists()
index = await get_instances_index()

# Build filter expression from provided parameters
filter_expr = None

if environment:
env_filter = Tag("environment") == environment.lower()
filter_expr = env_filter if filter_expr is None else (filter_expr & env_filter)

if usage:
usage_filter = Tag("usage") == usage.lower()
filter_expr = usage_filter if filter_expr is None else (filter_expr & usage_filter)

if status:
status_filter = Tag("status") == status.lower()
filter_expr = status_filter if filter_expr is None else (filter_expr & status_filter)

if instance_type:
type_filter = Tag("instance_type") == instance_type.lower()
filter_expr = type_filter if filter_expr is None else (filter_expr & type_filter)

if user_id:
user_filter = Tag("user_id") == user_id
filter_expr = user_filter if filter_expr is None else (filter_expr & user_filter)

if search:
# Use raw FilterExpression for wildcard matching (Tag escapes wildcards)
name_filter = FilterExpression(f"@name:{{*{search}*}}")
filter_expr = name_filter if filter_expr is None else (filter_expr & name_filter)

# Get total count with filter
count_expr = filter_expr if filter_expr is not None else "*"
try:
total = await index.query(CountQuery(filter_expression=count_expr))
total = int(total) if isinstance(total, int) else 0
except Exception:
total = 0

if total == 0:
return InstanceQueryResult(instances=[], total=0, limit=limit, offset=offset)

# Build query with pagination
# Clamp limit to reasonable bounds
limit = max(1, min(limit, 1000))
offset = max(0, offset)

fq = FilterQuery(
return_fields=["data"],
num_results=limit,
).sort_by("updated_at", asc=False)

if filter_expr is not None:
fq.set_filter(filter_expr)

fq.paging(offset, limit)
Comment on lines +437 to +445
Copy link

Copilot AI Dec 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The paging call uses positional arguments fq.paging(offset, limit), but this appears to be called after setting num_results. This could lead to confusion about which parameter controls pagination. Verify that paging() correctly overrides the num_results set on line 439, or consider removing the redundant num_results parameter from the FilterQuery constructor.

Copilot uses AI. Check for mistakes.

results = await index.query(fq)

# Parse results
instances: List[RedisInstance] = []
for doc in results or []:
try:
raw = doc.get("data")
if not raw:
continue
if isinstance(raw, bytes):
raw = raw.decode("utf-8")
inst_data = json.loads(raw)
if inst_data.get("connection_url"):
inst_data["connection_url"] = get_secret_value(inst_data["connection_url"])
if inst_data.get("admin_password"):
inst_data["admin_password"] = get_secret_value(inst_data["admin_password"])
instances.append(RedisInstance(**inst_data))
except Exception as e:
logger.error("Failed to load instance from query result: %s. Skipping.", e)

return InstanceQueryResult(
instances=instances,
total=total,
limit=limit,
offset=offset,
)

except Exception as e:
logger.error("Failed to query instances: %s", e)
return InstanceQueryResult(instances=[], total=0, limit=limit, offset=offset)


# --- Instances search index helpers (non-breaking integration) ---
async def _ensure_instances_index_exists() -> None:
try:
Expand Down Expand Up @@ -622,23 +750,81 @@ async def create_instance(

# Convenience lookups
async def get_instance_by_id(instance_id: str) -> Optional[RedisInstance]:
for inst in await get_instances():
if inst.id == instance_id:
return inst
return None
"""Get a single instance by ID using direct key lookup.

This is more efficient than get_instances() when you only need one instance,
as it does a direct HGETALL on the instance key instead of searching.
"""
try:
client = get_redis_client()
key = f"{SRE_INSTANCES_INDEX}:{instance_id}"
data = await client.hget(key, "data")

if not data:
return None

if isinstance(data, bytes):
data = data.decode("utf-8")

inst_data = json.loads(data)
if inst_data.get("connection_url"):
inst_data["connection_url"] = get_secret_value(inst_data["connection_url"])
if inst_data.get("admin_password"):
inst_data["admin_password"] = get_secret_value(inst_data["admin_password"])

return RedisInstance(**inst_data)
except Exception as e:
logger.error("Failed to get instance by ID %s: %s", instance_id, e)
return None


async def get_instance_by_name(instance_name: str) -> Optional[RedisInstance]:
for inst in await get_instances():
if inst.name == instance_name:
return inst
return None
"""Get a single instance by name using index query.

Uses RediSearch text search on the name field for efficient lookup.
"""
try:
await _ensure_instances_index_exists()
index = await get_instances_index()

# Use exact tag match on name field
fq = FilterQuery(
return_fields=["data"],
num_results=1,
)
fq.set_filter(Tag("name") == instance_name)

results = await index.query(fq)

if not results:
return None

doc = results[0]
raw = doc.get("data")
if not raw:
return None

if isinstance(raw, bytes):
raw = raw.decode("utf-8")

inst_data = json.loads(raw)
if inst_data.get("connection_url"):
inst_data["connection_url"] = get_secret_value(inst_data["connection_url"])
if inst_data.get("admin_password"):
inst_data["admin_password"] = get_secret_value(inst_data["admin_password"])

return RedisInstance(**inst_data)
except Exception as e:
logger.error("Failed to get instance by name %s: %s", instance_name, e)
return None


async def get_instance_map() -> Dict[str, RedisInstance]:
"""Get all instances as a dict keyed by instance ID."""
return {inst.id: inst for inst in await get_instances()}


async def get_instance_name(instance_id: str) -> Optional[str]:
"""Get just the name of an instance by ID."""
inst = await get_instance_by_id(instance_id)
return inst.name if inst else None
2 changes: 1 addition & 1 deletion redis_sre_agent/core/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@
"storage_type": "hash",
},
"fields": [
{"name": "name", "type": "text"},
{"name": "name", "type": "tag"},
{"name": "environment", "type": "tag"},
{"name": "usage", "type": "tag"},
{"name": "instance_type", "type": "tag"},
Expand Down
Loading
Loading