Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
383 changes: 274 additions & 109 deletions conformance/test/client.py

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions conformance/test/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -708,17 +708,18 @@ def _find_free_port():
return s.getsockname()[1]


Mode = Literal["sync", "async"]
Server = Literal["daphne", "granian", "gunicorn", "hypercorn", "pyvoy", "uvicorn"]


class Args(argparse.Namespace):
mode: Literal["sync", "async"]
mode: Mode
server: Server


async def main() -> None:
parser = argparse.ArgumentParser(description="Conformance server")
parser.add_argument("--mode", choices=["sync", "async"])
parser.add_argument("--mode", choices=get_args(Mode))
parser.add_argument("--server", choices=get_args(Server))
args = parser.parse_args(namespace=Args())

Expand Down
56 changes: 34 additions & 22 deletions conformance/test/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,14 @@
_client_py_path = str(_current_dir / "client.py")
_config_path = str(_current_dir / "config.yaml")

_skipped_tests = [
# Not implemented yet,
_skipped_tests_sync = [
# Need to use async APIs for proper cancellation support in Python.
"--skip",
"Client Cancellation/**",
]

_httpx_opts = [
# Trailers not supported
"--skip",
"**/Protocol:PROTOCOL_GRPC/**",
"--skip",
Expand All @@ -24,20 +30,28 @@
"gRPC Empty Responses/**",
"--skip",
"gRPC Proto Sub-Format Responses/**",
]

_skipped_tests_sync = [
*_skipped_tests,
# Need to use async APIs for proper cancellation support in Python.
# Bidirectional streaming not supported
"--skip",
"**/full-duplex/**",
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My conformance test implementation for full-duplex was wrong, allowing httpx to pass it. I fixed it and now httpx correctly fails it for not supporting bidirectional streaming

# Cancellation delivery isn't reliable
"--known-flaky",
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new client seems to perform just fine for cancellation and timeouts so far. This does reduce my motivation to chase the issue in httpx upstream.

Given it also enables bidirectional streaming, I guess we'll need to consider whether pyqwest should be the default client, but being so new definitely nervous about it, happy to hear any thoughts

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do think it would be nice to consolidate on a single client, although it's tricky; at this point httpx is pretty battle-tested, but I would assume it'll basically never allow us to support bidi streams. I don't have a great sense on the underlying reqwest library, although being similarly pre-1.0 and with a lot of open PRs / issues, not sure how much more API churn we'll see? I'd be curious to get more of your take on the maintenance health of that library and how far it is from a stable release. Still, I've seen it used in a number of places so I imagine it's pretty solid.

It'd be nice to spike out on replacing httpx w/ pyqwest and see how it behaves (better? worse?).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great questions - in terms of pre-1.0, I probably already got used to the standard Rust practice of many production libraries seemingly stuck at 0.x forever ;) I have been checking that repo extensively lately and my impression is it is well maintained, the open PRs are definitely a cause for pause, but I think there is active work as well on higher priority issues. It also seems widely used, with even Cargo optionally using it, with the default being curl, not even a rust project. So it seems like the rust "standard library" http client and a trustworthy dependency.

In terms of impact on connect-python of API changes, anyways those are controlled by pyqwest (me). I am pretty confident in its client API and don't expect much churn especially in raw usage of bytestreams.

It'd be nice to spike out on replacing httpx w/ pyqwest and see how it behaves (better? worse?).

Cool, yeah I think it will be worth seeing this. This will have to come after implementing ASGI / WSGI transports like httpx has which we use extensively, after that will give it a shot

"Client Cancellation/**",
"--known-flaky",
"Timeouts/**",
]


def test_client_sync() -> None:
@pytest.mark.parametrize("client", ["httpx", "pyqwest"])
def test_client_sync(client: str) -> None:
args = maybe_patch_args_with_debug(
[sys.executable, _client_py_path, "--mode", "sync"]
[sys.executable, _client_py_path, "--mode", "sync", "--client", client]
)

opts = []
match client:
case "httpx":
opts = _httpx_opts

result = subprocess.run(
[
"go",
Expand All @@ -47,6 +61,7 @@ def test_client_sync() -> None:
_config_path,
"--mode",
"client",
*opts,
*_skipped_tests_sync,
"--",
*args,
Expand All @@ -59,18 +74,17 @@ def test_client_sync() -> None:
pytest.fail(f"\n{result.stdout}\n{result.stderr}")


_skipped_tests_async = [
*_skipped_tests,
# Cancellation currently not working for full duplex
"--skip",
"Client Cancellation/**/full-duplex/**",
]


def test_client_async() -> None:
@pytest.mark.parametrize("client", ["httpx", "pyqwest"])
def test_client_async(client: str) -> None:
args = maybe_patch_args_with_debug(
[sys.executable, _client_py_path, "--mode", "async"]
[sys.executable, _client_py_path, "--mode", "async", "--client", client]
)

opts = []
match client:
case "httpx":
opts = _httpx_opts

result = subprocess.run(
[
"go",
Expand All @@ -80,9 +94,7 @@ def test_client_async() -> None:
_config_path,
"--mode",
"client",
*_skipped_tests_async,
"--known-flaky",
"Client Cancellation/**",
*opts,
"--",
*args,
],
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ dev = [
"mkdocs==1.6.1",
"mkdocs-material==9.6.20",
"mkdocstrings[python]==0.30.1",
"pyqwest==0.1.0",
"pyright[nodejs]==1.1.405",
"pytest-timeout==2.4.0",
"pyvoy==0.2.0",
Expand Down
99 changes: 59 additions & 40 deletions src/connectrpc/_client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from . import _client_shared
from ._asyncio_timeout import timeout as asyncio_timeout
from ._codec import Codec, get_proto_binary_codec, get_proto_json_codec
from ._envelope import EnvelopeReader
from ._interceptor_async import (
BidiStreamInterceptor,
ClientStreamInterceptor,
Expand All @@ -21,10 +20,8 @@
resolve_interceptors,
)
from ._protocol import ConnectWireError
from ._protocol_connect import (
CONNECT_STREAMING_HEADER_COMPRESSION,
ConnectEnvelopeWriter,
)
from ._protocol_connect import ConnectClientProtocol, ConnectEnvelopeWriter
from ._protocol_grpc import GRPCClientProtocol
from ._response_metadata import handle_response_headers
from .code import Code
from .errors import ConnectError
Expand All @@ -42,6 +39,7 @@
from types import TracebackType

from ._compression import Compression
from ._envelope import EnvelopeReader
from .method import MethodInfo
from .request import Headers, RequestContext

Expand Down Expand Up @@ -91,6 +89,7 @@ def __init__(
address: str,
*,
proto_json: bool = False,
grpc: bool = False,
accept_compression: Iterable[str] | None = None,
send_compression: str | None = None,
timeout_ms: int | None = None,
Expand Down Expand Up @@ -128,6 +127,11 @@ def __init__(
self._close_client = True
self._closed = False

if grpc:
self._protocol = GRPCClientProtocol()
else:
self._protocol = ConnectClientProtocol()

interceptors = resolve_interceptors(interceptors)
execute_unary = self._send_request_unary
for interceptor in (
Expand Down Expand Up @@ -192,7 +196,7 @@ async def execute_unary(
timeout_ms: int | None = None,
use_get: bool = False,
) -> RES:
ctx = _client_shared.create_request_context(
ctx = self._protocol.create_request_context(
method=method,
http_method="GET" if use_get else "POST",
user_headers=headers,
Expand All @@ -212,7 +216,7 @@ async def execute_client_stream(
headers: Headers | Mapping[str, str] | None = None,
timeout_ms: int | None = None,
) -> RES:
ctx = _client_shared.create_request_context(
ctx = self._protocol.create_request_context(
method=method,
http_method="POST",
user_headers=headers,
Expand All @@ -232,7 +236,7 @@ def execute_server_stream(
headers: Headers | Mapping[str, str] | None = None,
timeout_ms: int | None = None,
) -> AsyncIterator[RES]:
ctx = _client_shared.create_request_context(
ctx = self._protocol.create_request_context(
method=method,
http_method="POST",
user_headers=headers,
Expand All @@ -252,7 +256,7 @@ def execute_bidi_stream(
headers: Headers | Mapping[str, str] | None = None,
timeout_ms: int | None = None,
) -> AsyncIterator[RES]:
ctx = _client_shared.create_request_context(
ctx = self._protocol.create_request_context(
method=method,
http_method="POST",
user_headers=headers,
Expand All @@ -267,6 +271,11 @@ def execute_bidi_stream(
async def _send_request_unary(
self, request: REQ, ctx: RequestContext[REQ, RES]
) -> RES:
if isinstance(self._protocol, GRPCClientProtocol):
return await _consume_single_response(
self._send_request_bidi_stream(_yield_single_message(request), ctx)
)

request_headers = httpx.Headers(list(ctx.request_headers().allitems()))
url = f"{self._address}/{ctx.method().service_name}/{ctx.method().name}"
if (timeout_ms := ctx.timeout_ms()) is not None:
Expand Down Expand Up @@ -303,14 +312,14 @@ async def _send_request_unary(
timeout_s,
)

_client_shared.validate_response_content_encoding(
resp.headers.get("content-encoding", "")
)
_client_shared.validate_response_content_type(
self._protocol.validate_response(
self._codec.name(),
resp.status_code,
resp.headers.get("content-type", ""),
)
# Decompression itself is handled by httpx, but we validate it
# by resolving it.
self._protocol.handle_response_compression(resp.headers, stream=False)
handle_response_headers(resp.headers)

if resp.status_code == 200:
Expand Down Expand Up @@ -360,51 +369,61 @@ async def _send_request_bidi_stream(
timeout_s = None
timeout = USE_CLIENT_DEFAULT

reader: EnvelopeReader | None = None
resp: httpx.Response | None = None
try:
request_data = _streaming_request_content(
request, self._codec, self._send_compression
)

async with (
asyncio_timeout(timeout_s),
self._session.stream(
async with asyncio_timeout(timeout_s):
httpx_req = self._session.build_request(
method="POST",
url=url,
headers=request_headers,
content=request_data,
timeout=timeout,
) as resp,
):
compression = _client_shared.validate_response_content_encoding(
resp.headers.get(CONNECT_STREAMING_HEADER_COMPRESSION, "")
)
_client_shared.validate_stream_response_content_type(
self._codec.name(), resp.headers.get("content-type", "")
)
handle_response_headers(resp.headers)

if resp.status_code == 200:
reader = EnvelopeReader(
ctx.method().output,
self._codec,
compression,
self._read_max_bytes,
)
async for chunk in resp.aiter_bytes():
for message in reader.feed(chunk):
yield message
# Check for cancellation each message. While this seems heavyweight,
# conformance tests require it.
await sleep(0)
else:
raise ConnectWireError.from_response(resp).to_exception()
resp = await self._session.send(httpx_req, stream=True)
try:
handle_response_headers(resp.headers)
if resp.status_code == 200:
self._protocol.validate_stream_response(
self._codec.name(), resp.headers.get("content-type", "")
)
compression = self._protocol.handle_response_compression(
resp.headers, stream=True
)
reader = self._protocol.create_envelope_reader(
ctx.method().output,
self._codec,
compression,
self._read_max_bytes,
)
async for chunk in resp.aiter_bytes():
for message in reader.feed(chunk):
yield message
# Check for cancellation each message. While this seems heavyweight,
# conformance tests require it.
await sleep(0)
reader.handle_response_complete(resp)
else:
raise ConnectWireError.from_response(resp).to_exception()
finally:
await asyncio.shield(resp.aclose())
except (httpx.TimeoutException, TimeoutError, asyncio.TimeoutError) as e:
raise ConnectError(Code.DEADLINE_EXCEEDED, "Request timed out") from e
except ConnectError:
raise
except CancelledError as e:
raise ConnectError(Code.CANCELED, "Request was cancelled") from e
except Exception as e:
if rst_err := _client_shared.maybe_map_stream_reset(e, ctx):
# It is possible for a reset to come with trailers which should
# be used.
if reader and resp:
reader.handle_response_complete(resp, rst_err)
raise rst_err from e
raise ConnectError(Code.UNAVAILABLE, str(e)) from e


Expand Down
Loading