Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions ably/__init__.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
from ably.rest.rest import AblyRest
import logging

from ably.realtime.realtime import AblyRealtime
from ably.rest.auth import Auth
from ably.rest.push import Push
from ably.rest.rest import AblyRest
from ably.types.capability import Capability
from ably.types.channelsubscription import PushChannelSubscription
from ably.types.device import DeviceDetails
from ably.types.options import Options, VCDiffDecoder
from ably.util.crypto import CipherParams
from ably.util.exceptions import AblyException, AblyAuthException, IncompatibleClientIdException
from ably.util.exceptions import AblyAuthException, AblyException, IncompatibleClientIdException
from ably.vcdiff.default_vcdiff_decoder import AblyVCDiffDecoder

import logging

logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())

Expand Down
12 changes: 5 additions & 7 deletions ably/http/http.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
import functools
import json
import logging
import time
import json
from urllib.parse import urljoin

import httpx
import msgpack

from ably.rest.auth import Auth
from ably.http.httputils import HttpUtils
from ably.rest.auth import Auth
from ably.transport.defaults import Defaults
from ably.util.exceptions import AblyException
from ably.util.helper import is_token_error, extract_url_params
from ably.util.helper import extract_url_params, is_token_error

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -188,14 +188,12 @@ async def make_request(self, method, path, version=None, headers=None, body=None

hosts = self.get_rest_hosts()
for retry_count, host in enumerate(hosts):
def should_stop_retrying():
def should_stop_retrying(retry_count=retry_count):
time_passed = time.time() - requested_at
# if it's the last try or cumulative timeout is done, we stop retrying
return retry_count == len(hosts) - 1 or time_passed > http_max_retry_duration

base_url = "%s://%s:%d" % (self.preferred_scheme,
host,
self.preferred_port)
base_url = f"{self.preferred_scheme}://{host}:{self.preferred_port}"
url = urljoin(base_url, path)

(clean_url, url_params) = extract_url_params(url)
Expand Down
2 changes: 1 addition & 1 deletion ably/http/httputils.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def default_headers(version=None):
version = ably.api_version
return {
"X-Ably-Version": version,
"Ably-Agent": 'ably-python/%s python/%s' % (ably.lib_version, platform.python_version())
"Ably-Agent": f'ably-python/{ably.lib_version} python/{platform.python_version()}'
}

@staticmethod
Expand Down
4 changes: 2 additions & 2 deletions ably/http/paginatedresult.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

def format_time_param(t):
try:
return '%d' % (calendar.timegm(t.utctimetuple()) * 1000)
return f'{calendar.timegm(t.utctimetuple()) * 1000}'
except Exception:
return str(t)

Expand All @@ -33,7 +33,7 @@ def format_params(params=None, direction=None, start=None, end=None, limit=None,
if limit:
if limit > 1000:
raise ValueError("The maximum allowed limit is 1000")
params['limit'] = '%d' % limit
params['limit'] = f'{limit}'

if 'start' in params and 'end' in params and params['start'] > params['end']:
raise ValueError("'end' parameter has to be greater than or equal to 'start'")
Expand Down
10 changes: 6 additions & 4 deletions ably/realtime/connection.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
from __future__ import annotations

import functools
import logging
from typing import TYPE_CHECKING

from ably.realtime.connectionmanager import ConnectionManager
from ably.types.connectiondetails import ConnectionDetails
from ably.types.connectionstate import ConnectionEvent, ConnectionState, ConnectionStateChange
from ably.util.eventemitter import EventEmitter
from ably.util.exceptions import AblyException
from typing import TYPE_CHECKING, Optional

if TYPE_CHECKING:
from ably.realtime.realtime import AblyRealtime
Expand Down Expand Up @@ -39,7 +41,7 @@ class Connection(EventEmitter): # RTN4

def __init__(self, realtime: AblyRealtime):
self.__realtime = realtime
self.__error_reason: Optional[AblyException] = None
self.__error_reason: AblyException | None = None
self.__state = ConnectionState.CONNECTING if realtime.options.auto_connect else ConnectionState.INITIALIZED
self.__connection_manager = ConnectionManager(self.__realtime, self.state)
self.__connection_manager.on('connectionstate', self._on_state_update) # RTN4a
Expand Down Expand Up @@ -102,7 +104,7 @@ def state(self) -> ConnectionState:

# RTN25
@property
def error_reason(self) -> Optional[AblyException]:
def error_reason(self) -> AblyException | None:
"""An object describing the last error which occurred on the channel, if any."""
return self.__error_reason

Expand All @@ -115,5 +117,5 @@ def connection_manager(self) -> ConnectionManager:
return self.__connection_manager

@property
def connection_details(self) -> Optional[ConnectionDetails]:
def connection_details(self) -> ConnectionDetails | None:
return self.__connection_manager.connection_details
61 changes: 32 additions & 29 deletions ably/realtime/connectionmanager.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
from __future__ import annotations
import logging

import asyncio
import logging
from datetime import datetime
from queue import Queue
from typing import TYPE_CHECKING

import httpx
from ably.transport.websockettransport import WebSocketTransport, ProtocolMessageAction

from ably.transport.defaults import Defaults
from ably.transport.websockettransport import ProtocolMessageAction, WebSocketTransport
from ably.types.connectiondetails import ConnectionDetails
from ably.types.connectionerrors import ConnectionErrors
from ably.types.connectionstate import ConnectionEvent, ConnectionState, ConnectionStateChange
from ably.types.tokendetails import TokenDetails
from ably.util.exceptions import AblyException, IncompatibleClientIdException
from ably.util.eventemitter import EventEmitter
from datetime import datetime
from ably.util.helper import get_random_id, Timer, is_token_error
from typing import Optional, TYPE_CHECKING
from ably.types.connectiondetails import ConnectionDetails
from queue import Queue
from ably.util.exceptions import AblyException, IncompatibleClientIdException
from ably.util.helper import Timer, get_random_id, is_token_error

if TYPE_CHECKING:
from ably.realtime.realtime import AblyRealtime
Expand All @@ -26,23 +29,23 @@ def __init__(self, realtime: AblyRealtime, initial_state):
self.options = realtime.options
self.__ably = realtime
self.__state: ConnectionState = initial_state
self.__ping_future: Optional[asyncio.Future] = None
self.__ping_future: asyncio.Future | None = None
self.__timeout_in_secs: float = self.options.realtime_request_timeout / 1000
self.transport: Optional[WebSocketTransport] = None
self.__connection_details: Optional[ConnectionDetails] = None
self.connection_id: Optional[str] = None
self.transport: WebSocketTransport | None = None
self.__connection_details: ConnectionDetails | None = None
self.connection_id: str | None = None
self.__fail_state = ConnectionState.DISCONNECTED
self.transition_timer: Optional[Timer] = None
self.suspend_timer: Optional[Timer] = None
self.retry_timer: Optional[Timer] = None
self.connect_base_task: Optional[asyncio.Task] = None
self.disconnect_transport_task: Optional[asyncio.Task] = None
self.transition_timer: Timer | None = None
self.suspend_timer: Timer | None = None
self.retry_timer: Timer | None = None
self.connect_base_task: asyncio.Task | None = None
self.disconnect_transport_task: asyncio.Task | None = None
self.__fallback_hosts: list[str] = self.options.get_fallback_realtime_hosts()
self.queued_messages: Queue = Queue()
self.__error_reason: Optional[AblyException] = None
self.__error_reason: AblyException | None = None
super().__init__()

def enact_state_change(self, state: ConnectionState, reason: Optional[AblyException] = None) -> None:
def enact_state_change(self, state: ConnectionState, reason: AblyException | None = None) -> None:
current_state = self.__state
log.debug(f'ConnectionManager.enact_state_change(): {current_state} -> {state}; reason = {reason}')
self.__state = state
Expand Down Expand Up @@ -122,7 +125,7 @@ async def ping(self) -> float:
try:
response = await self.__ping_future
except asyncio.CancelledError:
raise AblyException("Ping request cancelled due to request timeout", 504, 50003)
raise AblyException("Ping request cancelled due to request timeout", 504, 50003) from None
return response

self.__ping_future = asyncio.Future()
Expand All @@ -136,14 +139,14 @@ async def ping(self) -> float:
try:
await asyncio.wait_for(self.__ping_future, self.__timeout_in_secs)
except asyncio.TimeoutError:
raise AblyException("Timeout waiting for ping response", 504, 50003)
raise AblyException("Timeout waiting for ping response", 504, 50003) from None

ping_end_time = datetime.now().timestamp()
response_time_ms = (ping_end_time - ping_start_time) * 1000
return round(response_time_ms, 2)

def on_connected(self, connection_details: ConnectionDetails, connection_id: str,
reason: Optional[AblyException] = None) -> None:
reason: AblyException | None = None) -> None:
self.__fail_state = ConnectionState.DISCONNECTED

self.__connection_details = connection_details
Expand Down Expand Up @@ -233,15 +236,15 @@ async def on_closed(self) -> None:
def on_channel_message(self, msg: dict) -> None:
self.__ably.channels._on_channel_message(msg)

def on_heartbeat(self, id: Optional[str]) -> None:
def on_heartbeat(self, id: str | None) -> None:
if self.__ping_future:
# Resolve on heartbeat from ping request.
if self.__ping_id == id:
if not self.__ping_future.cancelled():
self.__ping_future.set_result(None)
self.__ping_future = None

def deactivate_transport(self, reason: Optional[AblyException] = None):
def deactivate_transport(self, reason: AblyException | None = None):
self.transport = None
self.notify_state(ConnectionState.DISCONNECTED, reason)

Expand Down Expand Up @@ -275,7 +278,7 @@ def start_connect(self) -> None:
self.start_transition_timer(ConnectionState.CONNECTING)
self.connect_base_task = asyncio.create_task(self.connect_base())

async def connect_with_fallback_hosts(self, fallback_hosts: list) -> Optional[Exception]:
async def connect_with_fallback_hosts(self, fallback_hosts: list) -> Exception | None:
for host in fallback_hosts:
try:
if self.check_connection():
Expand Down Expand Up @@ -343,8 +346,8 @@ async def on_transport_failed(exception):
except asyncio.CancelledError:
return

def notify_state(self, state: ConnectionState, reason: Optional[AblyException] = None,
retry_immediately: Optional[bool] = None) -> None:
def notify_state(self, state: ConnectionState, reason: AblyException | None = None,
retry_immediately: bool | None = None) -> None:
# RTN15a
retry_immediately = (retry_immediately is not False) and (
state == ConnectionState.DISCONNECTED and self.__state == ConnectionState.CONNECTED)
Expand Down Expand Up @@ -383,7 +386,7 @@ def notify_state(self, state: ConnectionState, reason: Optional[AblyException] =
self.fail_queued_messages(reason)
self.ably.channels._propagate_connection_interruption(state, reason)

def start_transition_timer(self, state: ConnectionState, fail_state: Optional[ConnectionState] = None) -> None:
def start_transition_timer(self, state: ConnectionState, fail_state: ConnectionState | None = None) -> None:
log.debug(f'ConnectionManager.start_transition_timer(): transition state = {state}')

if self.transition_timer:
Expand Down Expand Up @@ -520,5 +523,5 @@ def state(self) -> ConnectionState:
return self.__state

@property
def connection_details(self) -> Optional[ConnectionDetails]:
def connection_details(self) -> ConnectionDetails | None:
return self.__connection_details
6 changes: 3 additions & 3 deletions ably/realtime/realtime.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import logging
import asyncio
import logging
from typing import Optional
from ably.realtime.realtime_channel import Channels

from ably.realtime.connection import Connection, ConnectionState
from ably.realtime.realtime_channel import Channels
from ably.rest.rest import AblyRest


log = logging.getLogger(__name__)


Expand Down
Loading