Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion pymongo/asynchronous/topology.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def __init__(self, topology_settings: TopologySettings):
self._publish_tp = self._listeners is not None and self._listeners.enabled_for_topology

# Create events queue if there are publishers.
self._events = None
self._events: queue.Queue[Any] | None = None
self.__events_executor: Any = None

if self._publish_server or self._publish_tp:
Expand All @@ -126,6 +126,7 @@ def __init__(self, topology_settings: TopologySettings):

if self._publish_tp:
assert self._events is not None
assert self._listeners is not None
self._events.put((self._listeners.publish_topology_opened, (self._topology_id,)))
self._settings = topology_settings
topology_description = TopologyDescription(
Expand All @@ -143,6 +144,7 @@ def __init__(self, topology_settings: TopologySettings):
)
if self._publish_tp:
assert self._events is not None
assert self._listeners is not None
self._events.put(
(
self._listeners.publish_topology_description_changed,
Expand All @@ -161,6 +163,7 @@ def __init__(self, topology_settings: TopologySettings):
for seed in topology_settings.seeds:
if self._publish_server:
assert self._events is not None
assert self._listeners is not None
self._events.put((self._listeners.publish_server_opened, (seed, self._topology_id)))
if _SDAM_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
Expand Down Expand Up @@ -491,6 +494,7 @@ async def _process_change(
suppress_event = sd_old == server_description
if self._publish_server and not suppress_event:
assert self._events is not None
assert self._listeners is not None
self._events.put(
(
self._listeners.publish_server_description_changed,
Expand All @@ -503,6 +507,7 @@ async def _process_change(

if self._publish_tp and not suppress_event:
assert self._events is not None
assert self._listeners is not None
self._events.put(
(
self._listeners.publish_topology_description_changed,
Expand Down Expand Up @@ -570,6 +575,7 @@ async def _process_srv_update(self, seedlist: list[tuple[str, Any]]) -> None:

if self._publish_tp:
assert self._events is not None
assert self._listeners is not None
self._events.put(
(
self._listeners.publish_topology_description_changed,
Expand Down Expand Up @@ -723,6 +729,7 @@ async def close(self) -> None:
# Publish only after releasing the lock.
if self._publish_tp:
assert self._events is not None
assert self._listeners is not None
self._description = TopologyDescription(
TOPOLOGY_TYPE.Unknown,
{},
Expand Down
9 changes: 8 additions & 1 deletion pymongo/synchronous/topology.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def __init__(self, topology_settings: TopologySettings):
self._publish_tp = self._listeners is not None and self._listeners.enabled_for_topology

# Create events queue if there are publishers.
self._events = None
self._events: queue.Queue[Any] | None = None
self.__events_executor: Any = None

if self._publish_server or self._publish_tp:
Expand All @@ -126,6 +126,7 @@ def __init__(self, topology_settings: TopologySettings):

if self._publish_tp:
assert self._events is not None
assert self._listeners is not None
self._events.put((self._listeners.publish_topology_opened, (self._topology_id,)))
self._settings = topology_settings
topology_description = TopologyDescription(
Expand All @@ -143,6 +144,7 @@ def __init__(self, topology_settings: TopologySettings):
)
if self._publish_tp:
assert self._events is not None
assert self._listeners is not None
self._events.put(
(
self._listeners.publish_topology_description_changed,
Expand All @@ -161,6 +163,7 @@ def __init__(self, topology_settings: TopologySettings):
for seed in topology_settings.seeds:
if self._publish_server:
assert self._events is not None
assert self._listeners is not None
self._events.put((self._listeners.publish_server_opened, (seed, self._topology_id)))
if _SDAM_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
Expand Down Expand Up @@ -491,6 +494,7 @@ def _process_change(
suppress_event = sd_old == server_description
if self._publish_server and not suppress_event:
assert self._events is not None
assert self._listeners is not None
self._events.put(
(
self._listeners.publish_server_description_changed,
Expand All @@ -503,6 +507,7 @@ def _process_change(

if self._publish_tp and not suppress_event:
assert self._events is not None
assert self._listeners is not None
self._events.put(
(
self._listeners.publish_topology_description_changed,
Expand Down Expand Up @@ -570,6 +575,7 @@ def _process_srv_update(self, seedlist: list[tuple[str, Any]]) -> None:

if self._publish_tp:
assert self._events is not None
assert self._listeners is not None
self._events.put(
(
self._listeners.publish_topology_description_changed,
Expand Down Expand Up @@ -721,6 +727,7 @@ def close(self) -> None:
# Publish only after releasing the lock.
if self._publish_tp:
assert self._events is not None
assert self._listeners is not None
self._description = TopologyDescription(
TOPOLOGY_TYPE.Unknown,
{},
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ mockupdb = [
]
perf = ["simplejson>=3.17.0"]
typing = [
"mypy==1.18.2",
"mypy==1.19.0",
"pyright==1.1.407",
"typing_extensions",
"pip"
Expand Down
8 changes: 4 additions & 4 deletions test/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ def create_user(self, dbname, user, pwd=None, roles=None, **kwargs):
def drop_user(self, dbname, user):
self.client[dbname].command("dropUser", user, writeConcern={"w": self.w})

def require_connection(self, func):
def require_connection(self, func: Any) -> Any:
"""Run a test only if we can connect to MongoDB."""
return self._require(
lambda: True, # _require checks if we're connected
Expand Down Expand Up @@ -552,7 +552,7 @@ def require_no_fips(self, func):
lambda: not self.fips_enabled, "Test cannot run on a FIPS-enabled host", func=func
)

def require_replica_set(self, func):
def require_replica_set(self, func: Any) -> Any:
"""Run a test only if the client is connected to a replica set."""
return self._require(lambda: self.is_rs, "Not connected to a replica set", func=func)

Expand Down Expand Up @@ -638,7 +638,7 @@ def require_load_balancer(self, func):
lambda: self.load_balancer, "Must be connected to a load balancer", func=func
)

def require_no_load_balancer(self, func):
def require_no_load_balancer(self, func: Any) -> Any:
"""Run a test only if the client is not connected to a load balancer."""
return self._require(
lambda: not self.load_balancer, "Must not be connected to a load balancer", func=func
Expand Down Expand Up @@ -687,7 +687,7 @@ def require_test_commands(self, func):
lambda: self.test_commands_enabled, "Test commands must be enabled", func=func
)

def require_failCommand_fail_point(self, func):
def require_failCommand_fail_point(self, func: Any) -> Any:
"""Run a test only if the server supports the failCommand fail
point.
"""
Expand Down
8 changes: 4 additions & 4 deletions test/asynchronous/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ async def create_user(self, dbname, user, pwd=None, roles=None, **kwargs):
async def drop_user(self, dbname, user):
await self.client[dbname].command("dropUser", user, writeConcern={"w": self.w})

def require_connection(self, func):
def require_connection(self, func: Any) -> Any:
"""Run a test only if we can connect to MongoDB."""
return self._require(
lambda: True, # _require checks if we're connected
Expand Down Expand Up @@ -552,7 +552,7 @@ def require_no_fips(self, func):
lambda: not self.fips_enabled, "Test cannot run on a FIPS-enabled host", func=func
)

def require_replica_set(self, func):
def require_replica_set(self, func: Any) -> Any:
"""Run a test only if the client is connected to a replica set."""
return self._require(lambda: self.is_rs, "Not connected to a replica set", func=func)

Expand Down Expand Up @@ -638,7 +638,7 @@ def require_load_balancer(self, func):
lambda: self.load_balancer, "Must be connected to a load balancer", func=func
)

def require_no_load_balancer(self, func):
def require_no_load_balancer(self, func: Any) -> Any:
"""Run a test only if the client is not connected to a load balancer."""
return self._require(
lambda: not self.load_balancer, "Must not be connected to a load balancer", func=func
Expand Down Expand Up @@ -687,7 +687,7 @@ def require_test_commands(self, func):
lambda: self.test_commands_enabled, "Test commands must be enabled", func=func
)

def require_failCommand_fail_point(self, func):
def require_failCommand_fail_point(self, func: Any) -> Any:
"""Run a test only if the server supports the failCommand fail
point.
"""
Expand Down
8 changes: 4 additions & 4 deletions test/asynchronous/test_change_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -771,8 +771,8 @@ async def test_split_large_change(self):
class TestClusterAsyncChangeStream(TestAsyncChangeStreamBase, APITestsMixin):
dbs: list

@async_client_context.require_version_min(4, 2, 0)
@async_client_context.require_change_streams
@async_client_context.require_version_min(4, 2, 0) # type:ignore[untyped-decorator]
@async_client_context.require_change_streams # type:ignore[untyped-decorator]
async def asyncSetUp(self) -> None:
await super().asyncSetUp()
self.dbs = [self.db, self.client.pymongo_test_2]
Expand Down Expand Up @@ -831,8 +831,8 @@ async def test_full_pipeline(self):


class TestAsyncDatabaseAsyncChangeStream(TestAsyncChangeStreamBase, APITestsMixin):
@async_client_context.require_version_min(4, 2, 0)
@async_client_context.require_change_streams
@async_client_context.require_version_min(4, 2, 0) # type:ignore[untyped-decorator]
@async_client_context.require_change_streams # type:ignore[untyped-decorator]
async def asyncSetUp(self) -> None:
await super().asyncSetUp()

Expand Down
2 changes: 1 addition & 1 deletion test/asynchronous/test_encryption.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ class AsyncEncryptionIntegrationTest(AsyncIntegrationTest):
"""Base class for encryption integration tests."""

@unittest.skipUnless(_HAVE_PYMONGOCRYPT, "pymongocrypt is not installed")
@async_client_context.require_version_min(4, 2, -1)
@async_client_context.require_version_min(4, 2, -1) # type:ignore[untyped-decorator]
async def asyncSetUp(self) -> None:
await super().asyncSetUp()

Expand Down
8 changes: 4 additions & 4 deletions test/test_change_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -757,8 +757,8 @@ def test_split_large_change(self):
class TestClusterChangeStream(TestChangeStreamBase, APITestsMixin):
dbs: list

@client_context.require_version_min(4, 2, 0)
@client_context.require_change_streams
@client_context.require_version_min(4, 2, 0) # type:ignore[untyped-decorator]
@client_context.require_change_streams # type:ignore[untyped-decorator]
def setUp(self) -> None:
super().setUp()
self.dbs = [self.db, self.client.pymongo_test_2]
Expand Down Expand Up @@ -817,8 +817,8 @@ def test_full_pipeline(self):


class TestDatabaseChangeStream(TestChangeStreamBase, APITestsMixin):
@client_context.require_version_min(4, 2, 0)
@client_context.require_change_streams
@client_context.require_version_min(4, 2, 0) # type:ignore[untyped-decorator]
@client_context.require_change_streams # type:ignore[untyped-decorator]
def setUp(self) -> None:
super().setUp()

Expand Down
2 changes: 1 addition & 1 deletion test/test_encryption.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ class EncryptionIntegrationTest(IntegrationTest):
"""Base class for encryption integration tests."""

@unittest.skipUnless(_HAVE_PYMONGOCRYPT, "pymongocrypt is not installed")
@client_context.require_version_min(4, 2, -1)
@client_context.require_version_min(4, 2, -1) # type:ignore[untyped-decorator]
def setUp(self) -> None:
super().setUp()

Expand Down
Loading
Loading