diff --git a/redis/asyncio/sentinel.py b/redis/asyncio/sentinel.py index d0455ab6eb..019c25747b 100644 --- a/redis/asyncio/sentinel.py +++ b/redis/asyncio/sentinel.py @@ -27,6 +27,10 @@ class SlaveNotFoundError(ConnectionError): pass +# Alias for REPLICA terminology (Redis 5.0+) +ReplicaNotFoundError = SlaveNotFoundError + + class SentinelManagedConnection(Connection): def __init__(self, **kwargs): self.connection_pool = kwargs.pop("connection_pool") @@ -166,6 +170,11 @@ async def rotate_slaves(self) -> AsyncIterator: pass raise SlaveNotFoundError(f"No slave found for {self.service_name!r}") + async def rotate_replicas(self) -> AsyncIterator: + """Round-robin replica balancer""" + async for replica in self.rotate_slaves(): + yield replica + class Sentinel(AsyncSentinelCommands): """ @@ -318,6 +327,12 @@ def filter_slaves( slaves_alive.append((slave["ip"], slave["port"])) return slaves_alive + def filter_replicas( + self, replicas: Iterable[Mapping] + ) -> Sequence[Tuple[EncodableT, EncodableT]]: + """Remove replicas that are in an ODOWN or SDOWN state""" + return self.filter_slaves(replicas) + async def discover_slaves( self, service_name: str ) -> Sequence[Tuple[EncodableT, EncodableT]]: @@ -332,6 +347,12 @@ async def discover_slaves( return slaves return [] + async def discover_replicas( + self, service_name: str + ) -> Sequence[Tuple[EncodableT, EncodableT]]: + """Returns a list of alive replicas for service ``service_name``""" + return await self.discover_slaves(service_name) + def master_for( self, service_name: str, @@ -402,3 +423,34 @@ def slave_for( connection_pool = connection_pool_class(service_name, self, **connection_kwargs) # The Redis object "owns" the pool return redis_class.from_pool(connection_pool) + + def replica_for( + self, + service_name: str, + redis_class: Type[Redis] = Redis, + connection_pool_class: Type[SentinelConnectionPool] = SentinelConnectionPool, + **kwargs, + ): + """ + Returns redis client instance for the ``service_name`` replica(s). + + A SentinelConnectionPool class is used to retrieve the replica's + address before establishing a new connection. + + By default clients will be a :py:class:`~redis.Redis` instance. + Specify a different class to the ``redis_class`` argument if you + desire something different. + + The ``connection_pool_class`` specifies the connection pool to use. + The SentinelConnectionPool will be used by default. + + All other keyword arguments are merged with any connection_kwargs + passed to this class and passed to the connection pool as keyword + arguments to be used to initialize Redis connections. + """ + return self.slave_for( + service_name, + redis_class=redis_class, + connection_pool_class=connection_pool_class, + **kwargs, + ) diff --git a/redis/sentinel.py b/redis/sentinel.py index f12bd8dd5d..2e6dc55347 100644 --- a/redis/sentinel.py +++ b/redis/sentinel.py @@ -21,6 +21,10 @@ class SlaveNotFoundError(ConnectionError): pass +# Alias for REPLICA terminology (Redis 5.0+) +ReplicaNotFoundError = SlaveNotFoundError + + class SentinelManagedConnection(Connection): def __init__(self, **kwargs): self.connection_pool = kwargs.pop("connection_pool") @@ -198,6 +202,10 @@ def rotate_slaves(self): "Round-robin slave balancer" return self.proxy.rotate_slaves() + def rotate_replicas(self): + "Round-robin replica balancer" + return self.rotate_slaves() + class Sentinel(SentinelCommands): """ @@ -343,6 +351,10 @@ def filter_slaves(self, slaves): slaves_alive.append((slave["ip"], slave["port"])) return slaves_alive + def filter_replicas(self, replicas): + "Remove replicas that are in an ODOWN or SDOWN state" + return self.filter_slaves(replicas) + def discover_slaves(self, service_name): "Returns a list of alive slaves for service ``service_name``" for sentinel in self.sentinels: @@ -355,6 +367,10 @@ def discover_slaves(self, service_name): return slaves return [] + def discover_replicas(self, service_name): + "Returns a list of alive replicas for service ``service_name``" + return self.discover_slaves(service_name) + def master_for( self, service_name, @@ -423,3 +439,34 @@ def slave_for( return redis_class.from_pool( connection_pool_class(service_name, self, **connection_kwargs) ) + + def replica_for( + self, + service_name, + redis_class=Redis, + connection_pool_class=SentinelConnectionPool, + **kwargs, + ): + """ + Returns redis client instance for the ``service_name`` replica(s). + + A SentinelConnectionPool class is used to retrieve the replica's + address before establishing a new connection. + + By default clients will be a :py:class:`~redis.Redis` instance. + Specify a different class to the ``redis_class`` argument if you + desire something different. + + The ``connection_pool_class`` specifies the connection pool to use. + The SentinelConnectionPool will be used by default. + + All other keyword arguments are merged with any connection_kwargs + passed to this class and passed to the connection pool as keyword + arguments to be used to initialize Redis connections. + """ + return self.slave_for( + service_name, + redis_class=redis_class, + connection_pool_class=connection_pool_class, + **kwargs, + ) diff --git a/tests/test_asyncio/test_sentinel.py b/tests/test_asyncio/test_sentinel.py index 87532ae5d5..c03a719f64 100644 --- a/tests/test_asyncio/test_sentinel.py +++ b/tests/test_asyncio/test_sentinel.py @@ -9,6 +9,7 @@ from redis import exceptions from redis.asyncio.sentinel import ( MasterNotFoundError, + ReplicaNotFoundError, Sentinel, SentinelConnectionPool, SlaveNotFoundError, @@ -396,3 +397,65 @@ async def test_sentinel_commands_with_strict_redis_client(request): assert isinstance(await client.sentinel_ckquorum("redis-py-test"), bool) await client.close() + + +# Tests for REPLICA aliases (Redis 5.0+ terminology) +@pytest.mark.onlynoncluster +async def test_replica_not_found_error_alias(): + """Test that ReplicaNotFoundError is an alias for SlaveNotFoundError""" + assert ReplicaNotFoundError is SlaveNotFoundError + + +@pytest.mark.onlynoncluster +async def test_replica_for_alias(cluster, sentinel): + """Test that replica_for() is an alias for slave_for()""" + cluster.slaves = [ + {"ip": "127.0.0.1", "port": 6379, "is_odown": False, "is_sdown": False} + ] + async with sentinel.replica_for("mymaster", db=9) as replica: + assert await replica.ping() + + +@pytest.mark.onlynoncluster +async def test_discover_replicas_alias(cluster, sentinel): + """Test that discover_replicas() is an alias for discover_slaves()""" + cluster.slaves = [ + {"ip": "slave0", "port": 1234, "is_odown": False, "is_sdown": False}, + {"ip": "slave1", "port": 1234, "is_odown": False, "is_sdown": False}, + ] + # discover_replicas should return the same result as discover_slaves + replicas = await sentinel.discover_replicas("mymaster") + slaves = await sentinel.discover_slaves("mymaster") + assert replicas == slaves + assert replicas == [("slave0", 1234), ("slave1", 1234)] + + +@pytest.mark.onlynoncluster +async def test_filter_replicas_alias(cluster, sentinel): + """Test that filter_replicas() is an alias for filter_slaves()""" + replicas = [ + {"ip": "replica0", "port": 1234, "is_odown": False, "is_sdown": False}, + {"ip": "replica1", "port": 1234, "is_odown": True, "is_sdown": False}, + ] + # filter_replicas should return the same result as filter_slaves + filtered_replicas = sentinel.filter_replicas(replicas) + filtered_slaves = sentinel.filter_slaves(replicas) + assert filtered_replicas == filtered_slaves + assert filtered_replicas == [("replica0", 1234)] + + +@pytest.mark.onlynoncluster +async def test_rotate_replicas_alias(cluster, sentinel, master_ip): + """Test that rotate_replicas() is an alias for rotate_slaves()""" + cluster.slaves = [ + {"ip": "slave0", "port": 6379, "is_odown": False, "is_sdown": False}, + {"ip": "slave1", "port": 6379, "is_odown": False, "is_sdown": False}, + ] + pool = SentinelConnectionPool("mymaster", sentinel) + rotator = pool.rotate_replicas() + assert await rotator.__anext__() in (("slave0", 6379), ("slave1", 6379)) + assert await rotator.__anext__() in (("slave0", 6379), ("slave1", 6379)) + # Fallback to master + assert await rotator.__anext__() == (master_ip, 6379) + with pytest.raises(SlaveNotFoundError): + await rotator.__anext__() diff --git a/tests/test_sentinel.py b/tests/test_sentinel.py index fb4a2d4056..e884d4d3df 100644 --- a/tests/test_sentinel.py +++ b/tests/test_sentinel.py @@ -8,6 +8,7 @@ from redis import exceptions from redis.sentinel import ( MasterNotFoundError, + ReplicaNotFoundError, Sentinel, SentinelConnectionPool, SlaveNotFoundError, @@ -373,3 +374,65 @@ def test_sentinel_commands_with_strict_redis_client(request): assert isinstance(client.sentinel_ckquorum("redis-py-test"), bool) client.close() + + +# Tests for REPLICA aliases (Redis 5.0+ terminology) +@pytest.mark.onlynoncluster +def test_replica_not_found_error_alias(): + """Test that ReplicaNotFoundError is an alias for SlaveNotFoundError""" + assert ReplicaNotFoundError is SlaveNotFoundError + + +@pytest.mark.onlynoncluster +def test_replica_for_alias(cluster, sentinel): + """Test that replica_for() is an alias for slave_for()""" + cluster.slaves = [ + {"ip": "127.0.0.1", "port": 6379, "is_odown": False, "is_sdown": False} + ] + replica = sentinel.replica_for("mymaster", db=9) + assert replica.ping() + + +@pytest.mark.onlynoncluster +def test_discover_replicas_alias(cluster, sentinel): + """Test that discover_replicas() is an alias for discover_slaves()""" + cluster.slaves = [ + {"ip": "slave0", "port": 1234, "is_odown": False, "is_sdown": False}, + {"ip": "slave1", "port": 1234, "is_odown": False, "is_sdown": False}, + ] + # discover_replicas should return the same result as discover_slaves + replicas = sentinel.discover_replicas("mymaster") + slaves = sentinel.discover_slaves("mymaster") + assert replicas == slaves + assert replicas == [("slave0", 1234), ("slave1", 1234)] + + +@pytest.mark.onlynoncluster +def test_filter_replicas_alias(cluster, sentinel): + """Test that filter_replicas() is an alias for filter_slaves()""" + replicas = [ + {"ip": "replica0", "port": 1234, "is_odown": False, "is_sdown": False}, + {"ip": "replica1", "port": 1234, "is_odown": True, "is_sdown": False}, + ] + # filter_replicas should return the same result as filter_slaves + filtered_replicas = sentinel.filter_replicas(replicas) + filtered_slaves = sentinel.filter_slaves(replicas) + assert filtered_replicas == filtered_slaves + assert filtered_replicas == [("replica0", 1234)] + + +@pytest.mark.onlynoncluster +def test_rotate_replicas_alias(cluster, sentinel, master_ip): + """Test that rotate_replicas() is an alias for rotate_slaves()""" + cluster.slaves = [ + {"ip": "slave0", "port": 6379, "is_odown": False, "is_sdown": False}, + {"ip": "slave1", "port": 6379, "is_odown": False, "is_sdown": False}, + ] + pool = SentinelConnectionPool("mymaster", sentinel) + rotator = pool.rotate_replicas() + assert next(rotator) in (("slave0", 6379), ("slave1", 6379)) + assert next(rotator) in (("slave0", 6379), ("slave1", 6379)) + # Fallback to master + assert next(rotator) == (master_ip, 6379) + with pytest.raises(SlaveNotFoundError): + next(rotator)