From e00665a6855f6b9014d5a3586b4e25f438e025bf Mon Sep 17 00:00:00 2001 From: 0xrushi <0xrushi> Date: Fri, 9 May 2025 03:22:35 -0400 Subject: [PATCH 1/4] add thread safe locks --- langchain_postgres/vectorstores.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/langchain_postgres/vectorstores.py b/langchain_postgres/vectorstores.py index a3743e4b..95f6e456 100644 --- a/langchain_postgres/vectorstores.py +++ b/langchain_postgres/vectorstores.py @@ -6,6 +6,7 @@ import logging import uuid import warnings +import threading from typing import ( Any, AsyncGenerator, @@ -100,11 +101,14 @@ class DistanceStrategy(str, enum.Enum): .union(SPECIAL_CASED_OPERATORS) ) +_embedding_collection_store_lock = threading.Lock() def _get_embedding_collection_store(vector_dimension: Optional[int] = None) -> Any: global _classes - if _classes is not None: - return _classes + + with _embedding_collection_store_lock: + if _classes is not None: + return _classes from pgvector.sqlalchemy import Vector # type: ignore @@ -514,10 +518,13 @@ async def acreate_vector_extension(self) -> None: async with self._async_engine.begin() as conn: await conn.run_sync(_create_vector_extension) + _create_tables_lock = threading.Lock() def create_tables_if_not_exists(self) -> None: - with self._make_sync_session() as session: - Base.metadata.create_all(session.get_bind()) - session.commit() + """Create tables if they don't exist in a thread-safe manner.""" + with _create_tables_lock: + with self._make_sync_session() as session: + Base.metadata.create_all(session.get_bind()) + session.commit() async def acreate_tables_if_not_exists(self) -> None: assert self._async_engine, "This method must be called with async_mode" From 5c2fd97e7b050cc742f82d33db55a1dee0a94a40 Mon Sep 17 00:00:00 2001 From: 0xrushi <0xrushi> Date: Fri, 9 May 2025 03:32:56 -0400 Subject: [PATCH 2/4] extend_existing optional --- langchain_postgres/vectorstores.py | 43 ++++++++++++++++++++++-------- 1 file changed, 32 insertions(+), 11 deletions(-) diff --git a/langchain_postgres/vectorstores.py b/langchain_postgres/vectorstores.py index 95f6e456..f94a1cca 100644 --- a/langchain_postgres/vectorstores.py +++ b/langchain_postgres/vectorstores.py @@ -103,7 +103,7 @@ class DistanceStrategy(str, enum.Enum): _embedding_collection_store_lock = threading.Lock() -def _get_embedding_collection_store(vector_dimension: Optional[int] = None) -> Any: +def _get_embedding_collection_store(vector_dimension: Optional[int] = None, extend_existing: bool = False) -> Any: global _classes with _embedding_collection_store_lock: @@ -117,6 +117,9 @@ class CollectionStore(Base): __tablename__ = "langchain_pg_collection" + if extend_existing: + __table_args__ = {'extend_existing': True} + uuid = sqlalchemy.Column( UUID(as_uuid=True), primary_key=True, default=uuid.uuid4 ) @@ -219,14 +222,25 @@ class EmbeddingStore(Base): document = sqlalchemy.Column(sqlalchemy.String, nullable=True) cmetadata = sqlalchemy.Column(JSONB, nullable=True) - __table_args__ = ( - sqlalchemy.Index( - "ix_cmetadata_gin", - "cmetadata", - postgresql_using="gin", - postgresql_ops={"cmetadata": "jsonb_path_ops"}, - ), - ) + if extend_existing: + __table_args__ = ( + sqlalchemy.Index( + "ix_cmetadata_gin", + "cmetadata", + postgresql_using="gin", + postgresql_ops={"cmetadata": "jsonb_path_ops"}, + ), + {'extend_existing': True} + ) + else: + __table_args__ = ( + sqlalchemy.Index( + "ix_cmetadata_gin", + "cmetadata", + postgresql_using="gin", + postgresql_ops={"cmetadata": "jsonb_path_ops"}, + ), + ) _classes = (EmbeddingStore, CollectionStore) @@ -391,6 +405,7 @@ def __init__( use_jsonb: bool = True, create_extension: bool = True, async_mode: bool = False, + extend_existing: bool = False, ) -> None: """Initialize the PGVector store. For an async version, use `PGVector.acreate()` instead. @@ -419,6 +434,9 @@ def __init__( create_extension: If True, will create the vector extension if it doesn't exist. disabling creation is useful when using ReadOnly Databases. + extend_existing: If True, will set extend_existing=True in table_args for + SQLAlchemy models. This helps prevent race conditions when multiple + threads try to create tables simultaneously. (default: False) """ self.async_mode = async_mode self.embedding_function = embeddings @@ -432,6 +450,7 @@ def __init__( self._engine: Optional[Engine] = None self._async_engine: Optional[AsyncEngine] = None self._async_init = False + self.extend_existing = extend_existing if isinstance(connection, str): if async_mode: @@ -474,7 +493,8 @@ def __post_init__( self.create_vector_extension() EmbeddingStore, CollectionStore = _get_embedding_collection_store( - self._embedding_length + self._embedding_length, + extend_existing=self.extend_existing, ) self.CollectionStore = CollectionStore self.EmbeddingStore = EmbeddingStore @@ -490,7 +510,8 @@ async def __apost_init__( self._async_init = True EmbeddingStore, CollectionStore = _get_embedding_collection_store( - self._embedding_length + self._embedding_length, + extend_existing=self.extend_existing, ) self.CollectionStore = CollectionStore self.EmbeddingStore = EmbeddingStore From 4ed9b1393cd49875b3eb718ae363004645aeb945 Mon Sep 17 00:00:00 2001 From: Rushi Chaudhari <6279035+0xrushi@users.noreply.github.com> Date: Fri, 9 May 2025 20:32:51 -0400 Subject: [PATCH 3/4] Update langchain_postgres/vectorstores.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Martín Gotelli Ferenaz --- langchain_postgres/vectorstores.py | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/langchain_postgres/vectorstores.py b/langchain_postgres/vectorstores.py index f94a1cca..568c870e 100644 --- a/langchain_postgres/vectorstores.py +++ b/langchain_postgres/vectorstores.py @@ -222,18 +222,7 @@ class EmbeddingStore(Base): document = sqlalchemy.Column(sqlalchemy.String, nullable=True) cmetadata = sqlalchemy.Column(JSONB, nullable=True) - if extend_existing: - __table_args__ = ( - sqlalchemy.Index( - "ix_cmetadata_gin", - "cmetadata", - postgresql_using="gin", - postgresql_ops={"cmetadata": "jsonb_path_ops"}, - ), - {'extend_existing': True} - ) - else: - __table_args__ = ( + __table_args__ = ( sqlalchemy.Index( "ix_cmetadata_gin", "cmetadata", @@ -241,6 +230,8 @@ class EmbeddingStore(Base): postgresql_ops={"cmetadata": "jsonb_path_ops"}, ), ) + if extend_existing: + __table_args__ = __table_args__ + ({'extend_existing': True}, ) _classes = (EmbeddingStore, CollectionStore) From c44057573b249bd87a78702e28e81fcc395264f8 Mon Sep 17 00:00:00 2001 From: 0xrushi <0xrushi> Date: Fri, 9 May 2025 20:36:37 -0400 Subject: [PATCH 4/4] make extend_existing private --- langchain_postgres/vectorstores.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/langchain_postgres/vectorstores.py b/langchain_postgres/vectorstores.py index 568c870e..a9873b31 100644 --- a/langchain_postgres/vectorstores.py +++ b/langchain_postgres/vectorstores.py @@ -441,7 +441,7 @@ def __init__( self._engine: Optional[Engine] = None self._async_engine: Optional[AsyncEngine] = None self._async_init = False - self.extend_existing = extend_existing + self._extend_existing = extend_existing if isinstance(connection, str): if async_mode: @@ -485,7 +485,7 @@ def __post_init__( EmbeddingStore, CollectionStore = _get_embedding_collection_store( self._embedding_length, - extend_existing=self.extend_existing, + extend_existing=self._extend_existing, ) self.CollectionStore = CollectionStore self.EmbeddingStore = EmbeddingStore @@ -502,7 +502,7 @@ async def __apost_init__( EmbeddingStore, CollectionStore = _get_embedding_collection_store( self._embedding_length, - extend_existing=self.extend_existing, + extend_existing=self._extend_existing, ) self.CollectionStore = CollectionStore self.EmbeddingStore = EmbeddingStore