Skip to content
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion DEVELOPMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ source .venv/bin/activate

Install package in editable mode.
```shell
poetry install --with dev,test,lint
uv sync --group test
```

Start PostgreSQL/PGVector.
Expand All @@ -28,5 +28,6 @@ docker run --rm -it --name pgvector-container \

Invoke test cases.
```shell
export POSTGRES_PORT=6024
pytest -vvv
```
97 changes: 97 additions & 0 deletions langchain_postgres/v2/async_vectorstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -672,6 +672,60 @@ async def __query_collection(
return combined_results
return dense_results

async def __query_collection_with_filter(
self,
*,
limit: Optional[int] = None,
offset: Optional[int] = None,
filter: Optional[dict] = None,
**kwargs: Any,
) -> Sequence[RowMapping]:
"""
Asynchronously query the database collection using filters and parameters and return matching rows."""

limit = limit if limit is not None else self.k
offset = offset if offset is not None else 0

columns = [
self.id_column,
self.content_column,
self.embedding_column,
] + self.metadata_columns
if self.metadata_json_column:
columns.append(self.metadata_json_column)

column_names = ", ".join(f'"{col}"' for col in columns)

safe_filter = None
filter_dict = None
if filter and isinstance(filter, dict):
safe_filter, filter_dict = self._create_filter_clause(filter)

suffix_id = str(uuid.uuid4()).split("-")[0]
where_filters = f"WHERE {safe_filter}" if safe_filter else ""
dense_query_stmt = f"""SELECT {column_names}
FROM "{self.schema_name}"."{self.table_name}" {where_filters} LIMIT :limit_{suffix_id} OFFSET :offset_{suffix_id};
"""
param_dict = {f"limit_{suffix_id}": limit, f"offset_{suffix_id}": offset}
if filter_dict:
param_dict.update(filter_dict)
if self.index_query_options:
async with self.engine.connect() as conn:
# Set each query option individually
for query_option in self.index_query_options.to_parameter():
query_options_stmt = f"SET LOCAL {query_option};"
await conn.execute(text(query_options_stmt))
result = await conn.execute(text(dense_query_stmt), param_dict)
result_map = result.mappings()
results = result_map.fetchall()
else:
async with self.engine.connect() as conn:
result = await conn.execute(text(dense_query_stmt), param_dict)
result_map = result.mappings()
results = result_map.fetchall()

return results

async def asimilarity_search(
self,
query: str,
Expand Down Expand Up @@ -995,6 +1049,38 @@ async def is_valid_index(
results = result_map.fetchall()
return bool(len(results) == 1)

async def aget(
self,
filter: Optional[dict] = None,
limit: Optional[int] = None,
offset: Optional[int] = None,
**kwargs: Any,
) -> list[Document]:
"""Retrieve documents from the collection using filters and parameters."""

results = await self.__query_collection_with_filter(
limit=limit, offset=offset, filter=filter, **kwargs
)

documents = []
for row in results:
metadata = (
row[self.metadata_json_column]
if self.metadata_json_column and row[self.metadata_json_column]
else {}
)
for col in self.metadata_columns:
metadata[col] = row[col]
documents.append(
Document(
page_content=row[self.content_column],
metadata=metadata,
id=str(row[self.id_column]),
),
)

return documents

async def aget_by_ids(self, ids: Sequence[str]) -> list[Document]:
"""Get documents by ids."""

Expand Down Expand Up @@ -1249,6 +1335,17 @@ def _create_filter_clause(self, filters: Any) -> tuple[str, dict]:
else:
return "", {}

def get(
self,
filter: Optional[dict] = None,
limit: Optional[int] = None,
offset: Optional[int] = None,
**kwargs: Any,
) -> list[Document]:
raise NotImplementedError(
"Sync methods are not implemented for AsyncPGVectorStore. Use PGVectorStore interface instead."
)

def get_by_ids(self, ids: Sequence[str]) -> list[Document]:
raise NotImplementedError(
"Sync methods are not implemented for AsyncPGVectorStore. Use PGVectorStore interface instead."
Expand Down
24 changes: 24 additions & 0 deletions langchain_postgres/v2/vectorstores.py
Original file line number Diff line number Diff line change
Expand Up @@ -875,5 +875,29 @@ def get_by_ids(self, ids: Sequence[str]) -> list[Document]:
"""Get documents by ids."""
return self._engine._run_as_sync(self.__vs.aget_by_ids(ids=ids))

async def aget(
self,
filter: Optional[dict] = None,
limit: Optional[int] = None,
offset: Optional[int] = None,
**kwargs: Any,
) -> list[Document]:
"""Retrieve documents from the collection using filters and parameters."""
return await self._engine._run_as_async(
self.__vs.aget(filter=filter, limit=limit, offset=offset, **kwargs)
)

def get(
self,
filter: Optional[dict] = None,
limit: Optional[int] = None,
offset: Optional[int] = None,
**kwargs: Any,
) -> list[Document]:
"""Retrieve documents from the collection using filters and parameters."""
return self._engine._run_as_sync(
self.__vs.aget(filter=filter, limit=limit, offset=offset, **kwargs)
)

def get_table_name(self) -> str:
return self.__vs.table_name
28 changes: 28 additions & 0 deletions tests/unit_tests/v2/test_async_pg_vectorstore_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,34 @@ async def test_vectorstore_with_metadata_filters(
)
assert [doc.metadata["code"] for doc in docs] == expected_ids, test_filter

@pytest.mark.parametrize("test_filter, expected_ids", FILTERING_TEST_CASES)
async def test_vectorstore_get(
self,
vs_custom_filter: AsyncPGVectorStore,
test_filter: dict,
expected_ids: list[str],
) -> None:
"""Test end to end construction and filter."""
docs = await vs_custom_filter.aget(test_filter)
assert set([doc.metadata["code"] for doc in docs]) == set(expected_ids), (
test_filter
)

async def test_vectorstore_get_limit_offset(
self,
vs_custom_filter: AsyncPGVectorStore,
) -> None:
"""Test limit and offset parameters of get method"""

all_docs = await vs_custom_filter.aget()
docs_from_combining = (
(await vs_custom_filter.aget(limit=1))
+ (await vs_custom_filter.aget(limit=1, offset=1))
+ (await vs_custom_filter.aget(offset=2))
)

assert all_docs == docs_from_combining

async def test_asimilarity_hybrid_search(self, vs: AsyncPGVectorStore) -> None:
results = await vs.asimilarity_search(
"foo", k=1, hybrid_search_config=HybridSearchConfig()
Expand Down
29 changes: 29 additions & 0 deletions tests/unit_tests/v2/test_pg_vectorstore_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,35 @@ def test_sync_vectorstore_with_metadata_filters(
docs = vs_custom_filter_sync.similarity_search("meow", k=5, filter=test_filter)
assert [doc.metadata["code"] for doc in docs] == expected_ids, test_filter

@pytest.mark.parametrize("test_filter, expected_ids", FILTERING_TEST_CASES)
def test_sync_vectorstore_get(
self,
vs_custom_filter_sync: PGVectorStore,
test_filter: dict,
expected_ids: list[str],
) -> None:
"""Test end to end construction and filter."""

docs = vs_custom_filter_sync.get(filter=test_filter)
assert set([doc.metadata["code"] for doc in docs]) == set(expected_ids), (
test_filter
)

def test_sync_vectorstore_get_limit_offset(
self,
vs_custom_filter_sync: PGVectorStore,
) -> None:
"""Test limit and offset parameters of get method"""

all_docs = vs_custom_filter_sync.get()
docs_from_combining = (
vs_custom_filter_sync.get(limit=1)
+ vs_custom_filter_sync.get(limit=1, offset=1)
+ vs_custom_filter_sync.get(offset=2)
)

assert all_docs == docs_from_combining

@pytest.mark.parametrize("test_filter", NEGATIVE_TEST_CASES)
def test_metadata_filter_negative_tests(
self, vs_custom_filter_sync: PGVectorStore, test_filter: dict
Expand Down
2 changes: 1 addition & 1 deletion uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.