diff --git a/conformance/test/client.py b/conformance/test/client.py index e621496..535b628 100644 --- a/conformance/test/client.py +++ b/conformance/test/client.py @@ -2,12 +2,15 @@ import argparse import asyncio +import contextlib +import multiprocessing +import queue import ssl import sys import time import traceback from tempfile import NamedTemporaryFile -from typing import TYPE_CHECKING, Literal, TypeVar +from typing import TYPE_CHECKING, Literal, TypeVar, get_args import httpx from _util import create_standard_streams @@ -16,7 +19,13 @@ ClientCompatResponse, ) from gen.connectrpc.conformance.v1.config_pb2 import Code as ConformanceCode -from gen.connectrpc.conformance.v1.config_pb2 import Codec, Compression, HTTPVersion +from gen.connectrpc.conformance.v1.config_pb2 import ( + Codec, + Compression, + HTTPVersion, + Protocol, + StreamType, +) from gen.connectrpc.conformance.v1.service_connect import ( ConformanceServiceClient, ConformanceServiceClientSync, @@ -31,6 +40,9 @@ UnimplementedRequest, ) from google.protobuf.message import Message +from pyqwest import HTTPTransport, SyncHTTPTransport +from pyqwest import HTTPVersion as PyQwestHTTPVersion +from pyqwest.httpx import AsyncPyqwestTransport, PyqwestTransport from connectrpc.client import ResponseMetadata from connectrpc.code import Code @@ -106,8 +118,127 @@ def _unpack_request(message: Any, request: T) -> T: return request +async def httpx_client_kwargs(test_request: ClientCompatRequest) -> dict: + kwargs = {} + match test_request.http_version: + case HTTPVersion.HTTP_VERSION_1: + kwargs["http1"] = True + kwargs["http2"] = False + case HTTPVersion.HTTP_VERSION_2: + kwargs["http1"] = False + kwargs["http2"] = True + if test_request.server_tls_cert: + ctx = ssl.create_default_context( + purpose=ssl.Purpose.SERVER_AUTH, + cadata=test_request.server_tls_cert.decode(), + ) + if test_request.HasField("client_tls_creds"): + + def load_certs() -> None: + with ( + NamedTemporaryFile() as cert_file, + NamedTemporaryFile() as key_file, + ): + cert_file.write(test_request.client_tls_creds.cert) + cert_file.flush() + key_file.write(test_request.client_tls_creds.key) + key_file.flush() + ctx.load_cert_chain(certfile=cert_file.name, keyfile=key_file.name) + + await asyncio.to_thread(load_certs) + kwargs["verify"] = ctx + + return kwargs + + +def pyqwest_client_kwargs(test_request: ClientCompatRequest) -> dict: + kwargs: dict = {"enable_gzip": True, "enable_brotli": True, "enable_zstd": True} + match test_request.http_version: + case HTTPVersion.HTTP_VERSION_1: + kwargs["http_version"] = PyQwestHTTPVersion.HTTP1 + case HTTPVersion.HTTP_VERSION_2: + kwargs["http_version"] = PyQwestHTTPVersion.HTTP2 + if test_request.server_tls_cert: + kwargs["tls_ca_cert"] = test_request.server_tls_cert + if test_request.HasField("client_tls_creds"): + kwargs["tls_key"] = test_request.client_tls_creds.key + kwargs["tls_cert"] = test_request.client_tls_creds.cert + + return kwargs + + +@contextlib.asynccontextmanager +async def client_sync( + test_request: ClientCompatRequest, client_type: Client +) -> AsyncIterator[ConformanceServiceClientSync]: + read_max_bytes = None + if test_request.message_receive_limit: + read_max_bytes = test_request.message_receive_limit + scheme = "https" if test_request.server_tls_cert else "http" + cleanup = contextlib.ExitStack() + match client_type: + case "httpx": + args = await httpx_client_kwargs(test_request) + session = cleanup.enter_context(httpx.Client(**args)) + case "pyqwest": + args = pyqwest_client_kwargs(test_request) + http_transport = cleanup.enter_context(SyncHTTPTransport(**args)) + transport = cleanup.enter_context(PyqwestTransport(http_transport)) + session = cleanup.enter_context(httpx.Client(transport=transport)) + + with ( + cleanup, + ConformanceServiceClientSync( + f"{scheme}://{test_request.host}:{test_request.port}", + session=session, + send_compression=_convert_compression(test_request.compression), + proto_json=test_request.codec == Codec.CODEC_JSON, + grpc=test_request.protocol == Protocol.PROTOCOL_GRPC, + read_max_bytes=read_max_bytes, + ) as client, + ): + yield client + + +@contextlib.asynccontextmanager +async def client_async( + test_request: ClientCompatRequest, client_type: Client +) -> AsyncIterator[ConformanceServiceClient]: + read_max_bytes = None + if test_request.message_receive_limit: + read_max_bytes = test_request.message_receive_limit + scheme = "https" if test_request.server_tls_cert else "http" + cleanup = contextlib.AsyncExitStack() + match client_type: + case "httpx": + args = await httpx_client_kwargs(test_request) + session = await cleanup.enter_async_context(httpx.AsyncClient(**args)) + case "pyqwest": + args = pyqwest_client_kwargs(test_request) + http_transport = await cleanup.enter_async_context(HTTPTransport(**args)) + transport = await cleanup.enter_async_context( + AsyncPyqwestTransport(http_transport) + ) + session = await cleanup.enter_async_context( + httpx.AsyncClient(transport=transport) + ) + + async with ( + cleanup, + ConformanceServiceClient( + f"{scheme}://{test_request.host}:{test_request.port}", + session=session, + send_compression=_convert_compression(test_request.compression), + proto_json=test_request.codec == Codec.CODEC_JSON, + grpc=test_request.protocol == Protocol.PROTOCOL_GRPC, + read_max_bytes=read_max_bytes, + ) as client, + ): + yield client + + async def _run_test( - mode: Literal["sync", "async"], test_request: ClientCompatRequest + mode: Mode, test_request: ClientCompatRequest, client_type: Client ) -> ClientCompatResponse: test_response = ClientCompatResponse() test_response.test_name = test_request.test_name @@ -115,9 +246,6 @@ async def _run_test( timeout_ms = None if test_request.timeout_ms: timeout_ms = test_request.timeout_ms - read_max_bytes = None - if test_request.message_receive_limit: - read_max_bytes = test_request.message_receive_limit request_headers = Headers() for header in test_request.request_headers: @@ -129,82 +257,78 @@ async def _run_test( with ResponseMetadata() as meta: try: task: asyncio.Task - session_kwargs = {} - match test_request.http_version: - case HTTPVersion.HTTP_VERSION_1: - session_kwargs["http1"] = True - session_kwargs["http2"] = False - case HTTPVersion.HTTP_VERSION_2: - session_kwargs["http1"] = False - session_kwargs["http2"] = True - scheme = "http" - if test_request.server_tls_cert: - scheme = "https" - ctx = ssl.create_default_context( - purpose=ssl.Purpose.SERVER_AUTH, - cadata=test_request.server_tls_cert.decode(), - ) - if test_request.HasField("client_tls_creds"): - with ( - NamedTemporaryFile() as cert_file, - NamedTemporaryFile() as key_file, - ): - cert_file.write(test_request.client_tls_creds.cert) - cert_file.flush() - key_file.write(test_request.client_tls_creds.key) - key_file.flush() - ctx.load_cert_chain( - certfile=cert_file.name, keyfile=key_file.name - ) - session_kwargs["verify"] = ctx + request_closed = asyncio.Event() match mode: case "sync": - with ( - httpx.Client(**session_kwargs) as session, - ConformanceServiceClientSync( - f"{scheme}://{test_request.host}:{test_request.port}", - session=session, - send_compression=_convert_compression( - test_request.compression - ), - proto_json=test_request.codec == Codec.CODEC_JSON, - read_max_bytes=read_max_bytes, - ) as client, - ): + async with client_sync(test_request, client_type) as client: match test_request.method: case "BidiStream": + request_queue = queue.Queue() def send_bidi_stream_request_sync( client: ConformanceServiceClientSync, request: Iterator[BidiStreamRequest], ) -> None: - for message in client.bidi_stream( + responses = client.bidi_stream( request, headers=request_headers, timeout_ms=timeout_ms, + ) + for message in test_request.request_messages: + if test_request.request_delay_ms: + time.sleep( + test_request.request_delay_ms / 1000.0 + ) + request_queue.put( + _unpack_request( + message, BidiStreamRequest() + ) + ) + + if ( + test_request.stream_type + != StreamType.STREAM_TYPE_FULL_DUPLEX_BIDI_STREAM + ): + continue + + response = next(responses, None) + if response is not None: + payloads.append(response.payload) + if ( + num + := test_request.cancel.after_num_responses + ) and len(payloads) >= num: + task.cancel() + + if test_request.cancel.HasField( + "before_close_send" ): - payloads.append(message.payload) + task.cancel() + + request_queue.put(None) + + request_closed.set() + + for response in responses: + payloads.append(response.payload) if ( num := test_request.cancel.after_num_responses ) and len(payloads) >= num: task.cancel() - def bidi_request_stream_sync(): - for message in test_request.request_messages: - if test_request.request_delay_ms: - time.sleep( - test_request.request_delay_ms / 1000.0 - ) - yield _unpack_request( - message, BidiStreamRequest() - ) + def bidi_stream_request_sync(): + while True: + request = request_queue.get() + if request is None: + return + yield request task = asyncio.create_task( asyncio.to_thread( send_bidi_stream_request_sync, client, - bidi_request_stream_sync(), + bidi_stream_request_sync(), ) ) @@ -230,6 +354,11 @@ def request_stream_sync(): yield _unpack_request( message, ClientStreamRequest() ) + if test_request.cancel.HasField( + "before_close_send" + ): + task.cancel() + request_closed.set() task = asyncio.create_task( asyncio.to_thread( @@ -262,6 +391,7 @@ def send_idempotent_unary_request_sync( ), ) ) + request_closed.set() case "ServerStream": def send_server_stream_request_sync( @@ -290,6 +420,7 @@ def send_server_stream_request_sync( ), ) ) + request_closed.set() case "Unary": def send_unary_request_sync( @@ -313,6 +444,7 @@ def send_unary_request_sync( ), ) ) + request_closed.set() case "Unimplemented": task = asyncio.create_task( asyncio.to_thread( @@ -325,6 +457,7 @@ def send_unary_request_sync( timeout_ms=timeout_ms, ) ) + request_closed.set() case _: msg = f"Unrecognized method: {test_request.method}" raise ValueError(msg) @@ -335,20 +468,10 @@ def send_unary_request_sync( task.cancel() await task case "async": - async with ( - httpx.AsyncClient(**session_kwargs) as session, - ConformanceServiceClient( - f"{scheme}://{test_request.host}:{test_request.port}", - session=session, - send_compression=_convert_compression( - test_request.compression - ), - proto_json=test_request.codec == Codec.CODEC_JSON, - read_max_bytes=read_max_bytes, - ) as client, - ): + async with client_async(test_request, client_type) as client: match test_request.method: case "BidiStream": + request_queue = asyncio.Queue() async def send_bidi_stream_request( client: ConformanceServiceClient, @@ -359,31 +482,55 @@ async def send_bidi_stream_request( headers=request_headers, timeout_ms=timeout_ms, ) - async for response in responses: - payloads.append(response.payload) - if ( - num - := test_request.cancel.after_num_responses - ) and len(payloads) >= num: - task.cancel() - - async def bidi_stream_request(): for message in test_request.request_messages: if test_request.request_delay_ms: await asyncio.sleep( test_request.request_delay_ms / 1000.0 ) - yield _unpack_request( - message, BidiStreamRequest() + await request_queue.put( + _unpack_request( + message, BidiStreamRequest() + ) ) + + if ( + test_request.stream_type + != StreamType.STREAM_TYPE_FULL_DUPLEX_BIDI_STREAM + ): + continue + + response = await anext(responses, None) + if response is not None: + payloads.append(response.payload) + if ( + num + := test_request.cancel.after_num_responses + ) and len(payloads) >= num: + task.cancel() + if test_request.cancel.HasField( "before_close_send" ): task.cancel() - # Don't finish the stream for this case by sleeping for - # a long time. We won't end up sleeping for long since we - # cancelled. - await asyncio.sleep(600) + + await request_queue.put(None) + + request_closed.set() + + async for response in responses: + payloads.append(response.payload) + if ( + num + := test_request.cancel.after_num_responses + ) and len(payloads) >= num: + task.cancel() + + async def bidi_stream_request(): + while True: + request = await request_queue.get() + if request is None: + return + yield request task = asyncio.create_task( send_bidi_stream_request( @@ -417,10 +564,7 @@ async def client_stream_request(): "before_close_send" ): task.cancel() - # Don't finish the stream for this case by sleeping for - # a long time. We won't end up sleeping for long since we - # cancelled. - await asyncio.sleep(600) + request_closed.set() task = asyncio.create_task( send_client_stream_request( @@ -450,6 +594,7 @@ async def send_idempotent_unary_request( ), ) ) + request_closed.set() case "ServerStream": async def send_server_stream_request( @@ -477,6 +622,7 @@ async def send_server_stream_request( ), ) ) + request_closed.set() case "Unary": async def send_unary_request( @@ -499,6 +645,7 @@ async def send_unary_request( ), ) ) + request_closed.set() case "Unimplemented": task = asyncio.create_task( client.unimplemented( @@ -510,10 +657,12 @@ async def send_unary_request( timeout_ms=timeout_ms, ) ) + request_closed.set() case _: msg = f"Unrecognized method: {test_request.method}" raise ValueError(msg) if test_request.cancel.after_close_send_ms: + await request_closed.wait() await asyncio.sleep( test_request.cancel.after_close_send_ms / 1000.0 ) @@ -541,34 +690,50 @@ async def send_unary_request( return test_response +Mode = Literal["sync", "async"] +Client = Literal["httpx", "pyqwest"] + + class Args(argparse.Namespace): - mode: Literal["sync", "async"] + mode: Mode + client: Client + parallel: int async def main() -> None: parser = argparse.ArgumentParser(description="Conformance client") - parser.add_argument("--mode", choices=["sync", "async"]) + parser.add_argument("--mode", choices=get_args(Mode)) + parser.add_argument("--parallel", type=int, default=multiprocessing.cpu_count() * 4) + parser.add_argument("--client", choices=get_args(Client)) args = parser.parse_args(namespace=Args()) stdin, stdout = await create_standard_streams() - while True: - try: - size_buf = await stdin.readexactly(4) - except asyncio.IncompleteReadError: - return - size = int.from_bytes(size_buf, byteorder="big") - # Allow to raise even on EOF since we always should have a message - request_buf = await stdin.readexactly(size) - request = ClientCompatRequest() - request.ParseFromString(request_buf) - - response = await _run_test(args.mode, request) - - response_buf = response.SerializeToString() - size_buf = len(response_buf).to_bytes(4, byteorder="big") - stdout.write(size_buf) - stdout.write(response_buf) - await stdout.drain() + sema = asyncio.Semaphore(args.parallel) + tasks: list[asyncio.Task] = [] + try: + while True: + try: + size_buf = await stdin.readexactly(4) + except asyncio.IncompleteReadError: + return + size = int.from_bytes(size_buf, byteorder="big") + # Allow to raise even on EOF since we always should have a message + request_buf = await stdin.readexactly(size) + request = ClientCompatRequest() + request.ParseFromString(request_buf) + + async def task(request: ClientCompatRequest) -> None: + async with sema: + response = await _run_test(args.mode, request, args.client) + + response_buf = response.SerializeToString() + size_buf = len(response_buf).to_bytes(4, byteorder="big") + stdout.write(size_buf + response_buf) + await stdout.drain() + + tasks.append(asyncio.create_task(task(request))) + finally: + asyncio.gather(*tasks) if __name__ == "__main__": diff --git a/conformance/test/server.py b/conformance/test/server.py index 799a58e..6e5c09c 100644 --- a/conformance/test/server.py +++ b/conformance/test/server.py @@ -708,17 +708,18 @@ def _find_free_port(): return s.getsockname()[1] +Mode = Literal["sync", "async"] Server = Literal["daphne", "granian", "gunicorn", "hypercorn", "pyvoy", "uvicorn"] class Args(argparse.Namespace): - mode: Literal["sync", "async"] + mode: Mode server: Server async def main() -> None: parser = argparse.ArgumentParser(description="Conformance server") - parser.add_argument("--mode", choices=["sync", "async"]) + parser.add_argument("--mode", choices=get_args(Mode)) parser.add_argument("--server", choices=get_args(Server)) args = parser.parse_args(namespace=Args()) diff --git a/conformance/test/test_client.py b/conformance/test/test_client.py index 22426ca..bbafad2 100644 --- a/conformance/test/test_client.py +++ b/conformance/test/test_client.py @@ -12,8 +12,14 @@ _client_py_path = str(_current_dir / "client.py") _config_path = str(_current_dir / "config.yaml") -_skipped_tests = [ - # Not implemented yet, +_skipped_tests_sync = [ + # Need to use async APIs for proper cancellation support in Python. + "--skip", + "Client Cancellation/**", +] + +_httpx_opts = [ + # Trailers not supported "--skip", "**/Protocol:PROTOCOL_GRPC/**", "--skip", @@ -24,20 +30,28 @@ "gRPC Empty Responses/**", "--skip", "gRPC Proto Sub-Format Responses/**", -] - -_skipped_tests_sync = [ - *_skipped_tests, - # Need to use async APIs for proper cancellation support in Python. + # Bidirectional streaming not supported "--skip", + "**/full-duplex/**", + # Cancellation delivery isn't reliable + "--known-flaky", "Client Cancellation/**", + "--known-flaky", + "Timeouts/**", ] -def test_client_sync() -> None: +@pytest.mark.parametrize("client", ["httpx", "pyqwest"]) +def test_client_sync(client: str) -> None: args = maybe_patch_args_with_debug( - [sys.executable, _client_py_path, "--mode", "sync"] + [sys.executable, _client_py_path, "--mode", "sync", "--client", client] ) + + opts = [] + match client: + case "httpx": + opts = _httpx_opts + result = subprocess.run( [ "go", @@ -47,6 +61,7 @@ def test_client_sync() -> None: _config_path, "--mode", "client", + *opts, *_skipped_tests_sync, "--", *args, @@ -59,18 +74,17 @@ def test_client_sync() -> None: pytest.fail(f"\n{result.stdout}\n{result.stderr}") -_skipped_tests_async = [ - *_skipped_tests, - # Cancellation currently not working for full duplex - "--skip", - "Client Cancellation/**/full-duplex/**", -] - - -def test_client_async() -> None: +@pytest.mark.parametrize("client", ["httpx", "pyqwest"]) +def test_client_async(client: str) -> None: args = maybe_patch_args_with_debug( - [sys.executable, _client_py_path, "--mode", "async"] + [sys.executable, _client_py_path, "--mode", "async", "--client", client] ) + + opts = [] + match client: + case "httpx": + opts = _httpx_opts + result = subprocess.run( [ "go", @@ -80,9 +94,7 @@ def test_client_async() -> None: _config_path, "--mode", "client", - *_skipped_tests_async, - "--known-flaky", - "Client Cancellation/**", + *opts, "--", *args, ], diff --git a/pyproject.toml b/pyproject.toml index f6e2732..067bf36 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -50,6 +50,7 @@ dev = [ "mkdocs==1.6.1", "mkdocs-material==9.6.20", "mkdocstrings[python]==0.30.1", + "pyqwest==0.1.0", "pyright[nodejs]==1.1.405", "pytest-timeout==2.4.0", "pyvoy==0.2.0", diff --git a/src/connectrpc/_client_async.py b/src/connectrpc/_client_async.py index d8fcfc3..c3f8bc8 100644 --- a/src/connectrpc/_client_async.py +++ b/src/connectrpc/_client_async.py @@ -11,7 +11,6 @@ from . import _client_shared from ._asyncio_timeout import timeout as asyncio_timeout from ._codec import Codec, get_proto_binary_codec, get_proto_json_codec -from ._envelope import EnvelopeReader from ._interceptor_async import ( BidiStreamInterceptor, ClientStreamInterceptor, @@ -21,10 +20,8 @@ resolve_interceptors, ) from ._protocol import ConnectWireError -from ._protocol_connect import ( - CONNECT_STREAMING_HEADER_COMPRESSION, - ConnectEnvelopeWriter, -) +from ._protocol_connect import ConnectClientProtocol, ConnectEnvelopeWriter +from ._protocol_grpc import GRPCClientProtocol from ._response_metadata import handle_response_headers from .code import Code from .errors import ConnectError @@ -42,6 +39,7 @@ from types import TracebackType from ._compression import Compression + from ._envelope import EnvelopeReader from .method import MethodInfo from .request import Headers, RequestContext @@ -91,6 +89,7 @@ def __init__( address: str, *, proto_json: bool = False, + grpc: bool = False, accept_compression: Iterable[str] | None = None, send_compression: str | None = None, timeout_ms: int | None = None, @@ -128,6 +127,11 @@ def __init__( self._close_client = True self._closed = False + if grpc: + self._protocol = GRPCClientProtocol() + else: + self._protocol = ConnectClientProtocol() + interceptors = resolve_interceptors(interceptors) execute_unary = self._send_request_unary for interceptor in ( @@ -192,7 +196,7 @@ async def execute_unary( timeout_ms: int | None = None, use_get: bool = False, ) -> RES: - ctx = _client_shared.create_request_context( + ctx = self._protocol.create_request_context( method=method, http_method="GET" if use_get else "POST", user_headers=headers, @@ -212,7 +216,7 @@ async def execute_client_stream( headers: Headers | Mapping[str, str] | None = None, timeout_ms: int | None = None, ) -> RES: - ctx = _client_shared.create_request_context( + ctx = self._protocol.create_request_context( method=method, http_method="POST", user_headers=headers, @@ -232,7 +236,7 @@ def execute_server_stream( headers: Headers | Mapping[str, str] | None = None, timeout_ms: int | None = None, ) -> AsyncIterator[RES]: - ctx = _client_shared.create_request_context( + ctx = self._protocol.create_request_context( method=method, http_method="POST", user_headers=headers, @@ -252,7 +256,7 @@ def execute_bidi_stream( headers: Headers | Mapping[str, str] | None = None, timeout_ms: int | None = None, ) -> AsyncIterator[RES]: - ctx = _client_shared.create_request_context( + ctx = self._protocol.create_request_context( method=method, http_method="POST", user_headers=headers, @@ -267,6 +271,11 @@ def execute_bidi_stream( async def _send_request_unary( self, request: REQ, ctx: RequestContext[REQ, RES] ) -> RES: + if isinstance(self._protocol, GRPCClientProtocol): + return await _consume_single_response( + self._send_request_bidi_stream(_yield_single_message(request), ctx) + ) + request_headers = httpx.Headers(list(ctx.request_headers().allitems())) url = f"{self._address}/{ctx.method().service_name}/{ctx.method().name}" if (timeout_ms := ctx.timeout_ms()) is not None: @@ -303,14 +312,14 @@ async def _send_request_unary( timeout_s, ) - _client_shared.validate_response_content_encoding( - resp.headers.get("content-encoding", "") - ) - _client_shared.validate_response_content_type( + self._protocol.validate_response( self._codec.name(), resp.status_code, resp.headers.get("content-type", ""), ) + # Decompression itself is handled by httpx, but we validate it + # by resolving it. + self._protocol.handle_response_compression(resp.headers, stream=False) handle_response_headers(resp.headers) if resp.status_code == 200: @@ -360,44 +369,48 @@ async def _send_request_bidi_stream( timeout_s = None timeout = USE_CLIENT_DEFAULT + reader: EnvelopeReader | None = None + resp: httpx.Response | None = None try: request_data = _streaming_request_content( request, self._codec, self._send_compression ) - async with ( - asyncio_timeout(timeout_s), - self._session.stream( + async with asyncio_timeout(timeout_s): + httpx_req = self._session.build_request( method="POST", url=url, headers=request_headers, content=request_data, timeout=timeout, - ) as resp, - ): - compression = _client_shared.validate_response_content_encoding( - resp.headers.get(CONNECT_STREAMING_HEADER_COMPRESSION, "") ) - _client_shared.validate_stream_response_content_type( - self._codec.name(), resp.headers.get("content-type", "") - ) - handle_response_headers(resp.headers) - - if resp.status_code == 200: - reader = EnvelopeReader( - ctx.method().output, - self._codec, - compression, - self._read_max_bytes, - ) - async for chunk in resp.aiter_bytes(): - for message in reader.feed(chunk): - yield message - # Check for cancellation each message. While this seems heavyweight, - # conformance tests require it. - await sleep(0) - else: - raise ConnectWireError.from_response(resp).to_exception() + resp = await self._session.send(httpx_req, stream=True) + try: + handle_response_headers(resp.headers) + if resp.status_code == 200: + self._protocol.validate_stream_response( + self._codec.name(), resp.headers.get("content-type", "") + ) + compression = self._protocol.handle_response_compression( + resp.headers, stream=True + ) + reader = self._protocol.create_envelope_reader( + ctx.method().output, + self._codec, + compression, + self._read_max_bytes, + ) + async for chunk in resp.aiter_bytes(): + for message in reader.feed(chunk): + yield message + # Check for cancellation each message. While this seems heavyweight, + # conformance tests require it. + await sleep(0) + reader.handle_response_complete(resp) + else: + raise ConnectWireError.from_response(resp).to_exception() + finally: + await asyncio.shield(resp.aclose()) except (httpx.TimeoutException, TimeoutError, asyncio.TimeoutError) as e: raise ConnectError(Code.DEADLINE_EXCEEDED, "Request timed out") from e except ConnectError: @@ -405,6 +418,12 @@ async def _send_request_bidi_stream( except CancelledError as e: raise ConnectError(Code.CANCELED, "Request was cancelled") from e except Exception as e: + if rst_err := _client_shared.maybe_map_stream_reset(e, ctx): + # It is possible for a reset to come with trailers which should + # be used. + if reader and resp: + reader.handle_response_complete(resp, rst_err) + raise rst_err from e raise ConnectError(Code.UNAVAILABLE, str(e)) from e diff --git a/src/connectrpc/_client_shared.py b/src/connectrpc/_client_shared.py index 6ddd618..474150b 100644 --- a/src/connectrpc/_client_shared.py +++ b/src/connectrpc/_client_shared.py @@ -1,41 +1,30 @@ from __future__ import annotations import base64 +import re from http import HTTPStatus from typing import TYPE_CHECKING, TypeVar +from httpx import RemoteProtocolError + from . import _compression from ._codec import CODEC_NAME_JSON, CODEC_NAME_JSON_CHARSET_UTF8, Codec -from ._compression import ( - Compression, - get_accept_encoding, - get_available_compressions, - get_compression, -) +from ._compression import Compression, get_available_compressions, get_compression from ._protocol import ConnectWireError from ._protocol_connect import ( CONNECT_PROTOCOL_VERSION, CONNECT_STREAMING_CONTENT_TYPE_PREFIX, - CONNECT_STREAMING_HEADER_ACCEPT_COMPRESSION, - CONNECT_STREAMING_HEADER_COMPRESSION, CONNECT_UNARY_CONTENT_TYPE_PREFIX, - CONNECT_UNARY_HEADER_ACCEPT_COMPRESSION, - CONNECT_UNARY_HEADER_COMPRESSION, codec_name_from_content_type, ) -from ._version import __version__ from .code import Code from .errors import ConnectError -from .request import Headers, RequestContext if TYPE_CHECKING: - from collections.abc import Iterable, Mapping - from httpx import Headers as HttpxHeaders - from .method import MethodInfo + from .request import RequestContext -_DEFAULT_CONNECT_USER_AGENT = f"connectrpc/{__version__}" REQ = TypeVar("REQ") RES = TypeVar("RES") @@ -54,64 +43,6 @@ def resolve_send_compression(compression_name: str | None) -> Compression | None return compression -def create_request_context( - *, - method: MethodInfo[REQ, RES], - http_method: str, - user_headers: Headers | Mapping[str, str] | None, - timeout_ms: int | None, - codec: Codec, - stream: bool, - accept_compression: Iterable[str] | None, - send_compression: Compression | None, -) -> RequestContext: - match user_headers: - case Headers(): - # Copy to prevent modification if user keeps reference - # TODO: Optimize - headers = Headers(tuple(user_headers.allitems())) - case None: - headers = Headers() - case _: - headers = Headers(user_headers) - - if "user-agent" not in headers: - headers["user-agent"] = _DEFAULT_CONNECT_USER_AGENT - headers["connect-protocol-version"] = CONNECT_PROTOCOL_VERSION - - compression_header = ( - CONNECT_STREAMING_HEADER_COMPRESSION - if stream - else CONNECT_UNARY_HEADER_COMPRESSION - ) - accept_compression_header = ( - CONNECT_STREAMING_HEADER_ACCEPT_COMPRESSION - if stream - else CONNECT_UNARY_HEADER_ACCEPT_COMPRESSION - ) - - if accept_compression is not None: - headers[accept_compression_header] = ", ".join(accept_compression) - else: - headers[accept_compression_header] = get_accept_encoding() - if send_compression is not None: - headers[compression_header] = send_compression.name() - else: - headers.pop(compression_header, None) - headers["content-type"] = ( - f"{CONNECT_STREAMING_CONTENT_TYPE_PREFIX if stream else CONNECT_UNARY_CONTENT_TYPE_PREFIX}{codec.name()}" - ) - if timeout_ms is not None: - headers["connect-timeout-ms"] = str(timeout_ms) - - return RequestContext( - method=method, - http_method=http_method, - request_headers=headers, - timeout_ms=timeout_ms, - ) - - def prepare_get_params( codec: Codec, request_data: bytes, headers: HttpxHeaders ) -> dict[str, str]: @@ -139,7 +70,7 @@ def validate_response_content_encoding( return res -def validate_response_content_type( +def validate_unary_response( request_codec_name: str, status_code: int, response_content_type: str ) -> None: if status_code != HTTPStatus.OK: @@ -196,3 +127,55 @@ def validate_stream_response_content_type( Code.INTERNAL, f"invalid content-type: '{response_content_type}'; expecting '{CONNECT_STREAMING_CONTENT_TYPE_PREFIX}{request_codec_name}'", ) + + +_stream_error_code_regex = re.compile( + r".*.*" +) + + +# https://github.com/connectrpc/connect-go/blob/59cc6973156cd9164d6bea493b1d106ed894f2df/error.go#L393 +def maybe_map_stream_reset( + e: Exception, ctx: RequestContext[REQ, RES] +) -> ConnectError | None: + if not isinstance(e, RemoteProtocolError): + return None + + msg = str(e) + # HTTPX serializes httpcore exceptions to string unfortunately + # https://github.com/encode/httpx/blob/ae1b9f66238f75ced3ced5e4485408435de10768/httpx/_transports/default.py#L117 + match = _stream_error_code_regex.match(msg) + if not match: + return None + + # don't need when httpx without h2 is installed + from h2.errors import ErrorCodes # noqa: PLC0415 + + match int(match.group(1)): + case ( + ErrorCodes.NO_ERROR + | ErrorCodes.PROTOCOL_ERROR + | ErrorCodes.INTERNAL_ERROR + | ErrorCodes.FLOW_CONTROL_ERROR + | ErrorCodes.SETTINGS_TIMEOUT + | ErrorCodes.FRAME_SIZE_ERROR + | ErrorCodes.COMPRESSION_ERROR + | ErrorCodes.CONNECT_ERROR + ): + return ConnectError(Code.INTERNAL, msg) + case ErrorCodes.REFUSED_STREAM: + return ConnectError(Code.UNAVAILABLE, msg) + case ErrorCodes.CANCEL: + # Some servers use CANCEL when deadline expires. We can't differentiate + # that from normal cancel without checking our own deadline. + if (t := ctx.timeout_ms()) is not None and t <= 0: + return ConnectError(Code.DEADLINE_EXCEEDED, msg) + return ConnectError(Code.CANCELED, msg) + case ErrorCodes.ENHANCE_YOUR_CALM: + return ConnectError(Code.RESOURCE_EXHAUSTED, f"Bandwidth exhausted: {msg}") + case ErrorCodes.INADEQUATE_SECURITY: + return ConnectError( + Code.PERMISSION_DENIED, f"Transport protocol insecure: {msg}" + ) + + return None diff --git a/src/connectrpc/_client_sync.py b/src/connectrpc/_client_sync.py index bbf5d70..30b46fd 100644 --- a/src/connectrpc/_client_sync.py +++ b/src/connectrpc/_client_sync.py @@ -6,9 +6,10 @@ import httpx from httpx import USE_CLIENT_DEFAULT, Timeout +from connectrpc._protocol_grpc import GRPCClientProtocol + from . import _client_shared from ._codec import Codec, get_proto_binary_codec, get_proto_json_codec -from ._envelope import EnvelopeReader from ._interceptor_sync import ( BidiStreamInterceptorSync, ClientStreamInterceptorSync, @@ -18,10 +19,7 @@ resolve_interceptors, ) from ._protocol import ConnectWireError -from ._protocol_connect import ( - CONNECT_STREAMING_HEADER_COMPRESSION, - ConnectEnvelopeWriter, -) +from ._protocol_connect import ConnectClientProtocol, ConnectEnvelopeWriter from ._response_metadata import handle_response_headers from .code import Code from .errors import ConnectError @@ -32,6 +30,7 @@ from types import TracebackType from ._compression import Compression + from ._envelope import EnvelopeReader from .method import MethodInfo from .request import Headers, RequestContext @@ -81,6 +80,7 @@ def __init__( address: str, *, proto_json: bool = False, + grpc: bool = False, accept_compression: Iterable[str] | None = None, send_compression: str | None = None, timeout_ms: int | None = None, @@ -116,6 +116,11 @@ def __init__( self._close_client = True self._closed = False + if grpc: + self._protocol = GRPCClientProtocol() + else: + self._protocol = ConnectClientProtocol() + interceptors = resolve_interceptors(interceptors) execute_unary = self._send_request_unary for interceptor in ( @@ -186,7 +191,7 @@ def execute_unary( timeout_ms: int | None = None, use_get: bool = False, ) -> RES: - ctx = _client_shared.create_request_context( + ctx = self._protocol.create_request_context( method=method, http_method="GET" if use_get else "POST", user_headers=headers, @@ -206,7 +211,7 @@ def execute_client_stream( headers: Headers | Mapping[str, str] | None = None, timeout_ms: int | None = None, ) -> RES: - ctx = _client_shared.create_request_context( + ctx = self._protocol.create_request_context( method=method, http_method="POST", user_headers=headers, @@ -226,7 +231,7 @@ def execute_server_stream( headers: Headers | Mapping[str, str] | None = None, timeout_ms: int | None = None, ) -> Iterator[RES]: - ctx = _client_shared.create_request_context( + ctx = self._protocol.create_request_context( method=method, http_method="POST", user_headers=headers, @@ -246,7 +251,7 @@ def execute_bidi_stream( headers: Headers | Mapping[str, str] | None = None, timeout_ms: int | None = None, ) -> Iterator[RES]: - ctx = _client_shared.create_request_context( + ctx = self._protocol.create_request_context( method=method, http_method="POST", user_headers=headers, @@ -259,6 +264,11 @@ def execute_bidi_stream( return self._execute_bidi_stream(request, ctx) def _send_request_unary(self, request: REQ, ctx: RequestContext[REQ, RES]) -> RES: + if isinstance(self._protocol, GRPCClientProtocol): + return _consume_single_response( + self._send_request_bidi_stream(iter([request]), ctx) + ) + request_headers = httpx.Headers(list(ctx.request_headers().allitems())) url = f"{self._address}/{ctx.method().service_name}/{ctx.method().name}" if (timeout_ms := ctx.timeout_ms()) is not None: @@ -287,14 +297,14 @@ def _send_request_unary(self, request: REQ, ctx: RequestContext[REQ, RES]) -> RE timeout=timeout, ) - _client_shared.validate_response_content_encoding( - resp.headers.get("content-encoding", "") - ) - _client_shared.validate_response_content_type( + self._protocol.validate_response( self._codec.name(), resp.status_code, resp.headers.get("content-type", ""), ) + # Decompression itself is handled by httpx, but we validate it + # by resolving it. + self._protocol.handle_response_compression(resp.headers, stream=False) handle_response_headers(resp.headers) if resp.status_code == 200: @@ -339,6 +349,8 @@ def _send_request_bidi_stream( timeout = USE_CLIENT_DEFAULT stream_error: Exception | None = None + reader: EnvelopeReader | None = None + resp: httpx.Response | None = None try: request_data = _streaming_request_content( request, self._codec, self._send_compression @@ -351,16 +363,16 @@ def _send_request_bidi_stream( content=request_data, timeout=timeout, ) as resp: - compression = _client_shared.validate_response_content_encoding( - resp.headers.get(CONNECT_STREAMING_HEADER_COMPRESSION, "") - ) - _client_shared.validate_stream_response_content_type( - self._codec.name(), resp.headers.get("content-type", "") - ) handle_response_headers(resp.headers) if resp.status_code == 200: - reader = EnvelopeReader( + self._protocol.validate_stream_response( + self._codec.name(), resp.headers.get("content-type", "") + ) + compression = self._protocol.handle_response_compression( + resp.headers, stream=True + ) + reader = self._protocol.create_envelope_reader( ctx.method().output, self._codec, compression, @@ -372,6 +384,14 @@ def _send_request_bidi_stream( except ConnectError as e: stream_error = e raise + # For sync, we rely on the HTTP client to handle timeout, but + # currently the one we use for gRPC does not propagate RST_STREAM + # correctly which is used for server timeouts. We go ahead and check + # the timeout ourselves too. + # https://github.com/hyperium/hyper/issues/3681#issuecomment-3734084436 + if (t := ctx.timeout_ms()) is not None and t <= 0: + raise TimeoutError + reader.handle_response_complete(resp) else: raise ConnectWireError.from_response(resp).to_exception() except (httpx.TimeoutException, TimeoutError) as e: @@ -385,6 +405,13 @@ def _send_request_bidi_stream( # the stream error here. if stream_error is not None: raise stream_error from None + + if rst_err := _client_shared.maybe_map_stream_reset(e, ctx): + # It is possible for a reset to come with trailers which should + # be used. + if reader and resp: + reader.handle_response_complete(resp, rst_err) + raise rst_err from e raise ConnectError(Code.UNAVAILABLE, str(e)) from e diff --git a/src/connectrpc/_envelope.py b/src/connectrpc/_envelope.py index ea2bf33..d238aa0 100644 --- a/src/connectrpc/_envelope.py +++ b/src/connectrpc/_envelope.py @@ -1,20 +1,20 @@ from __future__ import annotations -import json import struct from abc import ABC, abstractmethod from typing import TYPE_CHECKING, Any, Generic, TypeVar from ._compression import Compression, IdentityCompression -from ._protocol import ConnectWireError -from ._response_metadata import handle_response_trailers from .code import Code from .errors import ConnectError if TYPE_CHECKING: from collections.abc import Iterator + import httpx + from ._codec import Codec + from ._protocol import ConnectWireError from .request import Headers _RES = TypeVar("_RES") @@ -49,8 +49,8 @@ def _read_messages(self) -> Iterator[_RES]: if len(self._buffer) < self._next_message_length + 5: return - compressed = self._buffer[0] & 0b01 != 0 - end_stream = self._buffer[0] & 0b10 != 0 + prefix_byte = self._buffer[0] + compressed = prefix_byte & 0b01 != 0 message_data = self._buffer[5 : 5 + self._next_message_length] self._buffer = self._buffer[5 + self._next_message_length :] @@ -72,18 +72,7 @@ def _read_messages(self) -> Iterator[_RES]: f"message is larger than configured max {self._read_max_bytes}", ) - if end_stream: - end_stream_message: dict = json.loads(message_data) - metadata = end_stream_message.get("metadata") - if metadata: - handle_response_trailers(metadata) - error = end_stream_message.get("error") - if error: - # Most likely a bug in the protocol, handling of unknown code is different for unary - # and streaming. - raise ConnectWireError.from_dict( - error, 500, Code.UNKNOWN - ).to_exception() + if self.handle_end_message(prefix_byte, message_data): return res = self._message_class() @@ -95,6 +84,21 @@ def _read_messages(self) -> Iterator[_RES]: self._next_message_length = int.from_bytes(self._buffer[1:5], "big") + def handle_end_message( + self, prefix_byte: int, message_data: bytes | bytearray + ) -> bool: + """For client protocols with an end message like Connect and gRPC-Web, handle the end message. + Returns True if the end message was handled, False otherwise. + """ + return False + + def handle_response_complete( + self, response: httpx.Response, e: ConnectError | None = None + ) -> None: + """Handle any client finalization needed when the response is complete. + This is typically used to process trailers for gRPC. + """ + class EnvelopeWriter(ABC, Generic[_T]): def __init__(self, codec: Codec[_T, Any], compression: Compression | None) -> None: diff --git a/src/connectrpc/_protocol.py b/src/connectrpc/_protocol.py index ef511f7..1de614d 100644 --- a/src/connectrpc/_protocol.py +++ b/src/connectrpc/_protocol.py @@ -13,13 +13,13 @@ from .errors import ConnectError if TYPE_CHECKING: - from collections.abc import Sequence + from collections.abc import Iterable, Mapping, Sequence import httpx from ._codec import Codec from ._compression import Compression - from ._envelope import EnvelopeWriter + from ._envelope import EnvelopeReader, EnvelopeWriter from .method import MethodInfo from .request import Headers, RequestContext @@ -214,6 +214,51 @@ def negotiate_stream_compression( ... +class ClientProtocol(Protocol): + def create_request_context( + self, + *, + method: MethodInfo[REQ, RES], + http_method: str, + user_headers: Headers | Mapping[str, str] | None, + timeout_ms: int | None, + codec: Codec, + stream: bool, + accept_compression: Iterable[str] | None, + send_compression: Compression | None, + ) -> RequestContext[REQ, RES]: + """Creates a RequestContext for the given method and headers.""" + ... + + def validate_response( + self, request_codec_name: str, status_code: int, response_content_type: str + ) -> None: + """Validates a unary response""" + ... + + def validate_stream_response( + self, request_codec_name: str, response_content_type: str + ) -> None: + """Validates a streaming response""" + ... + + def handle_response_compression( + self, headers: httpx.Headers, *, stream: bool + ) -> Compression: + """Handles response compression based on the response headers.""" + ... + + def create_envelope_reader( + self, + message_class: type[RES], + codec: Codec, + compression: Compression, + read_max_bytes: int | None, + ) -> EnvelopeReader[RES]: + """Creates the EnvelopeReader to read response messages.""" + ... + + class HTTPException(Exception): """An HTTP exception returned directly before starting the connect protocol.""" diff --git a/src/connectrpc/_protocol_connect.py b/src/connectrpc/_protocol_connect.py index e6cf01d..c81de6e 100644 --- a/src/connectrpc/_protocol_connect.py +++ b/src/connectrpc/_protocol_connect.py @@ -5,15 +5,28 @@ from http import HTTPStatus from typing import TYPE_CHECKING, Any, TypeVar -from ._compression import IdentityCompression, get_compression, negotiate_compression -from ._envelope import EnvelopeWriter +from ._codec import CODEC_NAME_JSON, CODEC_NAME_JSON_CHARSET_UTF8, Codec +from ._compression import ( + IdentityCompression, + get_accept_encoding, + get_available_compressions, + get_compression, + negotiate_compression, +) +from ._envelope import EnvelopeReader, EnvelopeWriter from ._protocol import ConnectWireError, HTTPException +from ._response_metadata import handle_response_trailers +from ._version import __version__ from .code import Code from .errors import ConnectError from .method import IdempotencyLevel, MethodInfo from .request import Headers, RequestContext if TYPE_CHECKING: + from collections.abc import Iterable, Mapping + + import httpx + from ._codec import Codec from ._compression import Compression @@ -31,6 +44,9 @@ CONNECT_STREAMING_HEADER_ACCEPT_COMPRESSION = "connect-accept-encoding" +_DEFAULT_CONNECT_USER_AGENT = f"connectrpc/{__version__}" + + def codec_name_from_content_type(content_type: str, *, stream: bool) -> str: prefix = ( CONNECT_STREAMING_CONTENT_TYPE_PREFIX @@ -134,3 +150,167 @@ def end(self, user_trailers: Headers, error: ConnectWireError | None) -> bytes: if self._compression: data = self._compression.compress(data) return struct.pack(">BI", self._prefix | 0b10, len(data)) + data + + +class ConnectClientProtocol: + def create_request_context( + self, + *, + method: MethodInfo[REQ, RES], + http_method: str, + user_headers: Headers | Mapping[str, str] | None, + timeout_ms: int | None, + codec: Codec, + stream: bool, + accept_compression: Iterable[str] | None, + send_compression: Compression | None, + ) -> RequestContext[REQ, RES]: + match user_headers: + case Headers(): + # Copy to prevent modification if user keeps reference + # TODO: Optimize + headers = Headers(tuple(user_headers.allitems())) + case None: + headers = Headers() + case _: + headers = Headers(user_headers) + + if "user-agent" not in headers: + headers["user-agent"] = _DEFAULT_CONNECT_USER_AGENT + headers["connect-protocol-version"] = CONNECT_PROTOCOL_VERSION + + compression_header = ( + CONNECT_STREAMING_HEADER_COMPRESSION + if stream + else CONNECT_UNARY_HEADER_COMPRESSION + ) + accept_compression_header = ( + CONNECT_STREAMING_HEADER_ACCEPT_COMPRESSION + if stream + else CONNECT_UNARY_HEADER_ACCEPT_COMPRESSION + ) + + if accept_compression is not None: + headers[accept_compression_header] = ", ".join(accept_compression) + else: + headers[accept_compression_header] = get_accept_encoding() + if send_compression is not None: + headers[compression_header] = send_compression.name() + else: + headers.pop(compression_header, None) + headers["content-type"] = ( + f"{CONNECT_STREAMING_CONTENT_TYPE_PREFIX if stream else CONNECT_UNARY_CONTENT_TYPE_PREFIX}{codec.name()}" + ) + if timeout_ms is not None: + headers["connect-timeout-ms"] = str(timeout_ms) + + return RequestContext( + method=method, + http_method=http_method, + request_headers=headers, + timeout_ms=timeout_ms, + ) + + def validate_response( + self, request_codec_name: str, status_code: int, response_content_type: str + ) -> None: + if status_code != HTTPStatus.OK: + # Error responses must be JSON-encoded + if response_content_type in ( + f"{CONNECT_UNARY_CONTENT_TYPE_PREFIX}{CODEC_NAME_JSON}", + f"{CONNECT_UNARY_CONTENT_TYPE_PREFIX}{CODEC_NAME_JSON_CHARSET_UTF8}", + ): + return + raise ConnectWireError.from_http_status(status_code).to_exception() + + if not response_content_type.startswith(CONNECT_UNARY_CONTENT_TYPE_PREFIX): + raise ConnectError( + Code.UNKNOWN, + f"invalid content-type: '{response_content_type}'; expecting '{CONNECT_UNARY_CONTENT_TYPE_PREFIX}{request_codec_name}'", + ) + + response_codec_name = codec_name_from_content_type( + response_content_type, stream=False + ) + if response_codec_name == request_codec_name: + return + + if ( + response_codec_name == CODEC_NAME_JSON + and request_codec_name == CODEC_NAME_JSON_CHARSET_UTF8 + ) or ( + response_codec_name == CODEC_NAME_JSON_CHARSET_UTF8 + and request_codec_name == CODEC_NAME_JSON + ): + # Both are JSON + return + + raise ConnectError( + Code.INTERNAL, + f"invalid content-type: '{response_content_type}'; expecting '{CONNECT_UNARY_CONTENT_TYPE_PREFIX}{request_codec_name}'", + ) + + def validate_stream_response( + self, request_codec_name: str, response_content_type: str + ) -> None: + if not response_content_type.startswith(CONNECT_STREAMING_CONTENT_TYPE_PREFIX): + raise ConnectError( + Code.UNKNOWN, + f"invalid content-type: '{response_content_type}'; expecting '{CONNECT_STREAMING_CONTENT_TYPE_PREFIX}{request_codec_name}'", + ) + + response_codec_name = response_content_type[ + len(CONNECT_STREAMING_CONTENT_TYPE_PREFIX) : + ] + if response_codec_name != request_codec_name: + raise ConnectError( + Code.INTERNAL, + f"invalid content-type: '{response_content_type}'; expecting '{CONNECT_STREAMING_CONTENT_TYPE_PREFIX}{request_codec_name}'", + ) + + def handle_response_compression( + self, headers: httpx.Headers, *, stream: bool + ) -> Compression: + compression_header = ( + CONNECT_STREAMING_HEADER_COMPRESSION + if stream + else CONNECT_UNARY_HEADER_COMPRESSION + ) + encoding = headers.get(compression_header) + if not encoding: + return IdentityCompression() + res = get_compression(encoding) + if not res: + raise ConnectError( + Code.INTERNAL, + f"unknown encoding '{encoding}'; accepted encodings are {', '.join(get_available_compressions())}", + ) + return res + + def create_envelope_reader( + self, + message_class: type[RES], + codec: Codec, + compression: Compression, + read_max_bytes: int | None, + ) -> EnvelopeReader[RES]: + return ConnectEnvelopeReader(message_class, codec, compression, read_max_bytes) + + +class ConnectEnvelopeReader(EnvelopeReader[RES]): + def handle_end_message( + self, prefix_byte: int, message_data: bytes | bytearray + ) -> bool: + end_stream = prefix_byte & 0b10 != 0 + if not end_stream: + return False + end_stream_message: dict = json.loads(message_data) + metadata = end_stream_message.get("metadata") + if metadata: + handle_response_trailers(metadata) + error = end_stream_message.get("error") + if error: + # Most likely a bug in the protocol, handling of unknown code is different for unary + # and streaming. + raise ConnectWireError.from_dict(error, 500, Code.UNKNOWN).to_exception() + return True diff --git a/src/connectrpc/_protocol_grpc.py b/src/connectrpc/_protocol_grpc.py index ad8656a..09a0d6b 100644 --- a/src/connectrpc/_protocol_grpc.py +++ b/src/connectrpc/_protocol_grpc.py @@ -1,18 +1,32 @@ from __future__ import annotations +import sys import urllib.parse -from base64 import b64encode +from base64 import b64decode, b64encode from http import HTTPStatus from typing import TYPE_CHECKING, Any, TypeVar -from ._compression import get_compression, negotiate_compression -from ._envelope import EnvelopeWriter +from ._compression import ( + IdentityCompression, + get_accept_encoding, + get_available_compressions, + get_compression, + negotiate_compression, +) +from ._envelope import EnvelopeReader, EnvelopeWriter from ._gen.status_pb2 import Status from ._protocol import ConnectWireError, HTTPException +from ._response_metadata import handle_response_trailers +from ._version import __version__ from .code import Code +from .errors import ConnectError from .request import Headers, RequestContext if TYPE_CHECKING: + from collections.abc import Iterable, Mapping + + import httpx + from ._codec import Codec from ._compression import Compression from .method import MethodInfo @@ -27,6 +41,8 @@ GRPC_HEADER_COMPRESSION = "grpc-encoding" GRPC_HEADER_ACCEPT_COMPRESSION = "grpc-accept-encoding" +_DEFAULT_GRPC_USER_AGENT = f"grpc-python-connect/{__version__} ({sys.version})" + class GRPCServerProtocol: def create_request_context( @@ -133,6 +149,185 @@ def end(self, user_trailers: Headers, error: ConnectWireError | None) -> Headers return trailers +class GRPCClientProtocol: + def create_request_context( + self, + *, + method: MethodInfo[REQ, RES], + http_method: str, + user_headers: Headers | Mapping[str, str] | None, + timeout_ms: int | None, + codec: Codec, + stream: bool, + accept_compression: Iterable[str] | None, + send_compression: Compression | None, + ) -> RequestContext[REQ, RES]: + match user_headers: + case Headers(): + # Copy to prevent modification if user keeps reference + # TODO: Optimize + headers = Headers(tuple(user_headers.allitems())) + case None: + headers = Headers() + case _: + headers = Headers(user_headers) + headers["te"] = "trailers" + if "user-agent" not in headers: + headers["user-agent"] = _DEFAULT_GRPC_USER_AGENT + + if accept_compression is not None: + headers[GRPC_HEADER_ACCEPT_COMPRESSION] = ",".join(accept_compression) + else: + headers[GRPC_HEADER_ACCEPT_COMPRESSION] = get_accept_encoding() + if send_compression is not None: + headers[GRPC_HEADER_COMPRESSION] = send_compression.name() + else: + headers.pop(GRPC_HEADER_COMPRESSION, None) + headers["content-type"] = f"{GRPC_CONTENT_TYPE_PREFIX}{codec.name()}" + if timeout_ms is not None: + headers[GRPC_HEADER_TIMEOUT] = _serialize_timeout(timeout_ms) + + return RequestContext( + method=method, + http_method=http_method, + request_headers=headers, + timeout_ms=timeout_ms, + ) + + def validate_response( + self, request_codec_name: str, status_code: int, response_content_type: str + ) -> None: + raise NotImplementedError + + def validate_stream_response( + self, request_codec_name: str, response_content_type: str + ) -> None: + if not ( + response_content_type == GRPC_CONTENT_TYPE_DEFAULT + or response_content_type.startswith(GRPC_CONTENT_TYPE_PREFIX) + ): + raise ConnectError( + Code.UNKNOWN, + f"invalid content-type: '{response_content_type}'; expecting '{GRPC_CONTENT_TYPE_PREFIX}{request_codec_name}'", + ) + if response_content_type.startswith(GRPC_CONTENT_TYPE_PREFIX): + response_codec_name = response_content_type[len(GRPC_CONTENT_TYPE_PREFIX) :] + else: + response_codec_name = "proto" + if response_codec_name != request_codec_name: + raise ConnectError( + Code.INTERNAL, + f"invalid content-type: '{response_content_type}'; expecting '{GRPC_CONTENT_TYPE_PREFIX}{request_codec_name}'", + ) + + def handle_response_compression( + self, headers: httpx.Headers, *, stream: bool + ) -> Compression: + encoding = headers.get(GRPC_HEADER_COMPRESSION) + if not encoding: + return IdentityCompression() + res = get_compression(encoding) + if not res: + raise ConnectError( + Code.INTERNAL, + f"unknown encoding '{encoding}'; accepted encodings are {', '.join(get_available_compressions())}", + ) + return res + + def create_envelope_reader( + self, + message_class: type[RES], + codec: Codec, + compression: Compression, + read_max_bytes: int | None, + ) -> EnvelopeReader[RES]: + return GRPCEnvelopeReader(message_class, codec, compression, read_max_bytes) + + +class GRPCEnvelopeReader(EnvelopeReader[RES]): + def __init__( + self, + message_class: type[RES], + codec: Codec, + compression: Compression, + read_max_bytes: int | None, + ) -> None: + super().__init__(message_class, codec, compression, read_max_bytes) + self._read_message = False + + def handle_end_message( + self, prefix_byte: int, message_data: bytes | bytearray + ) -> bool: + # It's coincidence that this method is called when there is a body and not + # when there isn't. Somewhat hacky, but easiest way to handle the case + # where there is a body and no trailers, which conformance tests verify. + self._read_message = True + return False + + def handle_response_complete( + self, response: httpx.Response, e: ConnectError | None = None + ) -> None: + get_trailers = response.extensions.get("get_trailers") + if not get_trailers: + msg = "gRPC client support requires using an HTTPX-compatible client that supports trailers" + raise RuntimeError(msg) + # Go ahead and feed HTTP trailers regardless of gRPC semantics. + trailers: httpx.Headers = get_trailers() + handle_response_trailers({k: trailers.get_list(k) for k in trailers}) + + # Now handle gRPC trailers. They are either the HTTP trailers if there was body present + # or HTTP headers if there was no body. + grpc_status = trailers.get("grpc-status") + if grpc_status is None: + # If there was a body message, we do not read response headers + if self._read_message: + raise e or ConnectError(Code.INTERNAL, "missing grpc-status trailer") + trailers = response.headers + + grpc_status = trailers.get("grpc-status") + if grpc_status is None: + raise e or ConnectError(Code.INTERNAL, "missing grpc-status trailer") + + # e is present for RST_STREAM. We prioritize its code while reading message and details + # from trailers when available. + code = e.code if e else None + if grpc_status != "0": + message = trailers.get("grpc-message", "") + if grpc_status_details := trailers.get("grpc-status-details-bin"): + status = Status() + status.ParseFromString(b64decode(grpc_status_details + "===")) + connect_code = code or _grpc_status_to_connect.get( + str(status.code), Code.UNKNOWN + ) + raise ConnectError(connect_code, status.message, details=status.details) + connect_code = code or _grpc_status_to_connect.get( + grpc_status, Code.UNKNOWN + ) + raise ConnectError(connect_code, urllib.parse.unquote(message)) + + +_GRPC_TIMEOUT_MAX_VALUE = 1e8 + + +def _serialize_timeout(timeout_ms: int) -> str: + if timeout_ms <= 0: + return "0n" + + # The gRPC protocol limits timeouts to 8 characters (not counting the unit), + # so timeouts must be strictly less than 1e8 of the appropriate unit. + + if timeout_ms < _GRPC_TIMEOUT_MAX_VALUE: + size, unit = 1, "m" + elif timeout_ms < _GRPC_TIMEOUT_MAX_VALUE * 1000: + size, unit = 1 * 1000, "S" + elif timeout_ms < _GRPC_TIMEOUT_MAX_VALUE * 60 * 1000: + size, unit = 60 * 1000, "M" + else: + size, unit = 60 * 60 * 1000, "H" + + return f"{timeout_ms // size}{unit}" + + _connect_status_to_grpc = { Code.CANCELED: "1", Code.UNKNOWN: "2", @@ -151,3 +346,5 @@ def end(self, user_trailers: Headers, error: ConnectWireError | None) -> Headers Code.DATA_LOSS: "15", Code.UNAUTHENTICATED: "16", } + +_grpc_status_to_connect = {v: k for k, v in _connect_status_to_grpc.items()} diff --git a/test/test_errors.py b/test/test_errors.py index 03b21b3..8e07207 100644 --- a/test/test_errors.py +++ b/test/test_errors.py @@ -155,7 +155,13 @@ async def record_response(response) -> None: ), pytest.param( 200, - {"text": "weird encoding", "headers": {"content-encoding": "weird"}}, + { + "text": "weird encoding", + "headers": { + "content-type": "application/proto", + "content-encoding": "weird", + }, + }, Code.INTERNAL, "unknown encoding 'weird'; accepted encodings are gzip, br, zstd, identity", id="bad encoding", diff --git a/uv.lock b/uv.lock index e822d71..147ea34 100644 --- a/uv.lock +++ b/uv.lock @@ -466,6 +466,7 @@ dev = [ { name = "mkdocs" }, { name = "mkdocs-material" }, { name = "mkdocstrings", extra = ["python"] }, + { name = "pyqwest" }, { name = "pyright", extra = ["nodejs"] }, { name = "pytest" }, { name = "pytest-asyncio" }, @@ -501,6 +502,7 @@ dev = [ { name = "mkdocs", specifier = "==1.6.1" }, { name = "mkdocs-material", specifier = "==9.6.20" }, { name = "mkdocstrings", extras = ["python"], specifier = "==0.30.1" }, + { name = "pyqwest", specifier = "==0.1.0" }, { name = "pyright", extras = ["nodejs"], specifier = "==1.1.405" }, { name = "pytest" }, { name = "pytest-asyncio" }, @@ -1855,6 +1857,56 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/d1/81/ef2b1dfd1862567d573a4fdbc9f969067621764fbb74338496840a1d2977/pyopenssl-25.3.0-py3-none-any.whl", hash = "sha256:1fda6fc034d5e3d179d39e59c1895c9faeaf40a79de5fc4cbbfbe0d36f4a77b6", size = 57268, upload-time = "2025-09-17T00:32:19.474Z" }, ] +[[package]] +name = "pyqwest" +version = "0.1.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/99/af/0a40903f7774f6426d162e706eb827927cd93fb426db25185f5420723bd2/pyqwest-0.1.0.tar.gz", hash = "sha256:3d46d41c490dd427578852284a8a0a288c9c2fe100fb2d7a995b24fbe5bdcec3", size = 388361, upload-time = "2026-01-11T12:48:59.424Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/b8/34/3f9a3ecc7d07fe5235c8ce66dd508725b5e07ac478d3875d5ed1ab08d84d/pyqwest-0.1.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:fde67a58bf2818334ec28078f0ad136bf949853781fba853ff1f2ea14383c7cf", size = 4643938, upload-time = "2026-01-11T12:47:52.006Z" }, + { url = "https://files.pythonhosted.org/packages/ea/25/419831910dd2482d72447e33caa51de72c27eeb387aba856c45cb974b0d6/pyqwest-0.1.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:381c9791367c0fa0412d30e36d565ee5c852f17f8a0039fc105b0948872f2d20", size = 4946931, upload-time = "2026-01-11T12:47:53.915Z" }, + { url = "https://files.pythonhosted.org/packages/ae/46/85f1f8d7ece18bb353bc747f74bbd35e83a8cc81ca89fef6f5299968c9e3/pyqwest-0.1.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:595feda76a57b4e66e40aba1ff8311240279007557772c8c1ba1e4619aea91a7", size = 4964379, upload-time = "2026-01-11T12:47:55.372Z" }, + { url = "https://files.pythonhosted.org/packages/2a/43/7bd95cba1a3aa3e4bff030ae4702ce9f77d79f3b5a96d904d58f7f7ac8d9/pyqwest-0.1.0-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:771fde2a689dd91d86e594e260157894e9c20bb7ed361d24df52bebc0518de7d", size = 5134230, upload-time = "2026-01-11T12:47:57.161Z" }, + { url = "https://files.pythonhosted.org/packages/ca/2c/353fc236322dad943e90605d261f8b80e2713417b12555d5c844cb87716e/pyqwest-0.1.0-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:2edc31a3fac5793e0fe4bf9f985667d3e449c22a919c1a099e7b2cce4bde36fb", size = 5272762, upload-time = "2026-01-11T12:47:58.775Z" }, + { url = "https://files.pythonhosted.org/packages/c1/6d/04e11fa0758260497c9c8b41fa88f9573323bd6872884bae083bdd644e27/pyqwest-0.1.0-cp310-cp310-win_amd64.whl", hash = "sha256:03a90a60081af25eabd381e8ef738e76f2bfe4ca0d1fb97b33f27784c900270d", size = 4135779, upload-time = "2026-01-11T12:48:00.295Z" }, + { url = "https://files.pythonhosted.org/packages/19/d2/bf9ca7f42571b9803ccf9efe6f859bb5855ba0bd139b24e3fd14163517d2/pyqwest-0.1.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:440ace438c318022581d6f4dbb4db126c27eac3e833be81492019e6b7c756ffd", size = 4642506, upload-time = "2026-01-11T12:48:01.905Z" }, + { url = "https://files.pythonhosted.org/packages/00/c7/a740b87d628a063d80a81ad98bdf4ce53d364cb52a2e1fa7246baa99b914/pyqwest-0.1.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:62e52b49399cfae41a1691bf2908c21cf33d0661a2c398ff11262650508825a7", size = 4949646, upload-time = "2026-01-11T12:48:03.258Z" }, + { url = "https://files.pythonhosted.org/packages/e8/67/5f87096e56ec2d6deca1bd3b14238229fe98c5e9c61acf1c23ea535d0e86/pyqwest-0.1.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:752512dff7e815ad24fc8cbc967da9c9ca4dbea95f90994655bf42fd68b3a97d", size = 4967856, upload-time = "2026-01-11T12:48:04.747Z" }, + { url = "https://files.pythonhosted.org/packages/ba/2e/2e6e1740a430adab5dd8d89aa2416449dad561a4943b4c08e48a65b70db3/pyqwest-0.1.0-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:b04c3bbb7257c294ad17e99a16910e5d7fed8fe584e214b37de04e74553d841c", size = 5137749, upload-time = "2026-01-11T12:48:06.62Z" }, + { url = "https://files.pythonhosted.org/packages/e2/3d/89131413f7463407fec88045bcaf02f9b0b4cb2e3283b72d6fc6c3d4335c/pyqwest-0.1.0-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:32f40b0abbacc3f55d07b65d40f7040ddb02216b6f97519a2c1665c9699fe6fd", size = 5276383, upload-time = "2026-01-11T12:48:08.082Z" }, + { url = "https://files.pythonhosted.org/packages/e0/57/972b4b56a465dc66a735da0c781c65f1b4d65a8834bdf260f5e60e38b17c/pyqwest-0.1.0-cp311-cp311-win_amd64.whl", hash = "sha256:b7bc7a2c0214a8b1371ed2e007fe5a2218be5d2eaf8406ed3c8240ee76844730", size = 4142489, upload-time = "2026-01-11T12:48:09.959Z" }, + { url = "https://files.pythonhosted.org/packages/58/20/429ae13e269a6042ce9ef1b1f459edd470c667395f849ee4c96e4dcfd3de/pyqwest-0.1.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:c0b9623e7b9531e57d564de94a512db106f612881923bf073f6cb9726e1a1fa2", size = 4640061, upload-time = "2026-01-11T12:48:11.557Z" }, + { url = "https://files.pythonhosted.org/packages/cd/8e/e9d7fbbea7de86bb77eccaf8ca25e38738847d64781aa2440cf75044c351/pyqwest-0.1.0-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:29df3d76dd21dea00013495812f789ab5d34b380897e1c32f48a90fef7977a0b", size = 4954669, upload-time = "2026-01-11T12:48:13.092Z" }, + { url = "https://files.pythonhosted.org/packages/f2/36/7cd96800470cf9d3de1418c6268f9cd7d8ccffceb74c7f1b5701e3249cfd/pyqwest-0.1.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:085c578c62f94b490e3544ba5b81da40a4b6428cc2be05d5f5c9b8fc1ed2bf28", size = 4971272, upload-time = "2026-01-11T12:48:14.867Z" }, + { url = "https://files.pythonhosted.org/packages/fb/f6/63c87973f4b29fd343313562190de41299e3a92e06cf47dc240730308c06/pyqwest-0.1.0-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:deaf53bf89e0add516e92fcb54403125708ba987ad63647154ee77fed2b9b491", size = 5140426, upload-time = "2026-01-11T12:48:16.692Z" }, + { url = "https://files.pythonhosted.org/packages/c4/18/ca072fa26af6af7adcf8ed9aeb8b4c8dae33da40c8b71b3b29fc183d8169/pyqwest-0.1.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:2d68989d760b61c19480125cc88c4a6eb389cd6655dcdfbd09431db256a865b1", size = 5282165, upload-time = "2026-01-11T12:48:18.286Z" }, + { url = "https://files.pythonhosted.org/packages/2e/7b/b59b9230a3b909ecccf55b93baf1da202058eba8cdc2765e0247ab75eb91/pyqwest-0.1.0-cp312-cp312-win_amd64.whl", hash = "sha256:8c99af411b43fb5075d5529842c5ca5ee980cd5b665cbc67878c28ae7d174e75", size = 4144298, upload-time = "2026-01-11T12:48:19.726Z" }, + { url = "https://files.pythonhosted.org/packages/50/dc/d3e4ee3ae335b8de8b5d487b29a7f4d285ba42bbd0220ec43c28c6515cc8/pyqwest-0.1.0-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:18a8da35b0f36c0f9b3a6d843d79e27c02e975c4e3b4709294e28d8adf89697d", size = 4639203, upload-time = "2026-01-11T12:48:21.181Z" }, + { url = "https://files.pythonhosted.org/packages/d3/6a/afa3348738fac1a7eb0655dfe2a184468dea9e7fcbf6893dea7628c03157/pyqwest-0.1.0-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f544df941383ef9a8610675526468a02ce6806e1d581f346a3b0fd4346ac26db", size = 4954570, upload-time = "2026-01-11T12:48:22.792Z" }, + { url = "https://files.pythonhosted.org/packages/9d/65/81dc839a591c6fedbce4049ef6016ca3b06e842fffac57fcf826a26f53bd/pyqwest-0.1.0-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:4c8bb1f89c929c148e9c8b1052aa35773c5e80aeea3f0982ae4e579b41dc3627", size = 4971174, upload-time = "2026-01-11T12:48:24.495Z" }, + { url = "https://files.pythonhosted.org/packages/87/56/f4a0ed74017f9b21f10847661311376698de309f3586c5a147f248de7fbb/pyqwest-0.1.0-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:22a293cd6e4bd35cc023fd8de2b0d00a7ef513040fd0377d2ee274a7fe4389ed", size = 5139044, upload-time = "2026-01-11T12:48:25.992Z" }, + { url = "https://files.pythonhosted.org/packages/37/ab/f8e320e2a165e99c9177a6aae97ca9f5a539b93f6d6f7602c7ae3687fcdf/pyqwest-0.1.0-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:9593ba2c8db6a3fe353a6731b884e76a8212ecfa86af4a4380a4945fafc71c63", size = 5282032, upload-time = "2026-01-11T12:48:27.541Z" }, + { url = "https://files.pythonhosted.org/packages/d3/0e/b157099a9da34fd9c9df80a38e593a86786c876985c7ef2268f5c9b0ee8b/pyqwest-0.1.0-cp313-cp313-win_amd64.whl", hash = "sha256:82f0112bd2aadfc8b26cc8bcaf8805bc7cea47e8779cec2bfe99bcac80990e26", size = 4143984, upload-time = "2026-01-11T12:48:29.329Z" }, + { url = "https://files.pythonhosted.org/packages/33/26/bba3f380655f17e29536706556492d88618f11eef51967ad20b724380585/pyqwest-0.1.0-cp314-cp314-macosx_11_0_arm64.whl", hash = "sha256:a5cc9dff673c3f08550a163130e426a0733edd2b686ee44ee1b67e2a629ae9f3", size = 4630320, upload-time = "2026-01-11T12:48:30.829Z" }, + { url = "https://files.pythonhosted.org/packages/22/bf/4288e904d5eb4acb7429548d82001fa9869050b219efc01f409c6d0706f8/pyqwest-0.1.0-cp314-cp314-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f7b41771d2ae2b8a1761ef3eff15326338e239149f161d5fe75a9085db239a22", size = 4948466, upload-time = "2026-01-11T12:48:32.295Z" }, + { url = "https://files.pythonhosted.org/packages/0e/f6/2299a8bfb991f96e8f8a67645cafff909b1b41417d2713dc49a443e4475c/pyqwest-0.1.0-cp314-cp314-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:c8b2122c2504351e8a697a3f09da6b17ea4c1260f0db47346461da06f5ca7973", size = 4964217, upload-time = "2026-01-11T12:48:34.058Z" }, + { url = "https://files.pythonhosted.org/packages/f2/b2/80d74a8bea12fe8f783a99c282ee1e0e47877d4708b37cb55f4a2f6a3650/pyqwest-0.1.0-cp314-cp314-musllinux_1_2_aarch64.whl", hash = "sha256:fd750245298d792374ad1429d2c8b0f2ad9b20a71c9e47dc03aa85d7589f48eb", size = 5133456, upload-time = "2026-01-11T12:48:35.5Z" }, + { url = "https://files.pythonhosted.org/packages/0b/aa/7a88e915b81194f18cdc378df7fa5fb7e9ab7a85b11fb35e49c0724a1078/pyqwest-0.1.0-cp314-cp314-musllinux_1_2_x86_64.whl", hash = "sha256:3edc41632c084edfc9f9d7c93a84244a0cc09e8976890deebcf69fd6a0d9c291", size = 5275121, upload-time = "2026-01-11T12:48:37.124Z" }, + { url = "https://files.pythonhosted.org/packages/c1/79/acc6cb2806b4d5f3b97546fc2f81d215551bc0a361de1dcf239cc2786e02/pyqwest-0.1.0-cp314-cp314-win_amd64.whl", hash = "sha256:6a3804cca0f034912af586b7b95f7feaa37ea7ee524e15a62dc0eea12386d523", size = 4136557, upload-time = "2026-01-11T12:48:38.704Z" }, + { url = "https://files.pythonhosted.org/packages/93/35/23bd0d80f963eab113691546ef367db6e3e9360ee9db2379f5e3e73a42af/pyqwest-0.1.0-cp314-cp314t-macosx_11_0_arm64.whl", hash = "sha256:2a67e1da99985f782deda17b3221badb9ad5a424016ec56f01d97f04b52fe530", size = 4634702, upload-time = "2026-01-11T12:48:40.135Z" }, + { url = "https://files.pythonhosted.org/packages/0c/ab/b9468cc4f3438f6d1eda39a83c179432f34ac416c48c214d60eb7e776753/pyqwest-0.1.0-cp314-cp314t-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:299579fce767a7e5d8bf70cdf6ae8cab4c70a20b3e1d6d3ee852d8d9b519160c", size = 4942523, upload-time = "2026-01-11T12:48:41.821Z" }, + { url = "https://files.pythonhosted.org/packages/31/03/4ac7305619bfeb5157d33d3659529c7f01d31d770451e6010b7f87df8276/pyqwest-0.1.0-cp314-cp314t-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:28bab85f7829e179bb494f44f151246e9232dadd549d567f8b2bc9cf8f6af95b", size = 4962586, upload-time = "2026-01-11T12:48:43.369Z" }, + { url = "https://files.pythonhosted.org/packages/d6/9b/36aee889a09f7573e2b94af96c4c33eda75b23fab0685cc40463c88a0dbf/pyqwest-0.1.0-cp314-cp314t-musllinux_1_2_aarch64.whl", hash = "sha256:1cb50d27252e9eeb309d1f11a321d65cc7d8bc151ff3357d2045dca2a38d1876", size = 5128760, upload-time = "2026-01-11T12:48:44.995Z" }, + { url = "https://files.pythonhosted.org/packages/44/12/4dd546bfdfd98cb1afff12c08fc2a750703a1dd47454e04d924129ae26bb/pyqwest-0.1.0-cp314-cp314t-musllinux_1_2_x86_64.whl", hash = "sha256:386f48f76e7499703b7edb4d26112b92943adc7fa3c09186cb67bba15bf7fa3b", size = 5272754, upload-time = "2026-01-11T12:48:46.528Z" }, + { url = "https://files.pythonhosted.org/packages/10/0b/8417fbf765ecb892378aaa4592c4c2f8e2c65e2c8aa95280c7588424396f/pyqwest-0.1.0-cp314-cp314t-win_amd64.whl", hash = "sha256:a6cf36d60a8626d1907c71494334eb3ee837c792d3cff2d243257217e5ce2720", size = 4131758, upload-time = "2026-01-11T12:48:48.068Z" }, + { url = "https://files.pythonhosted.org/packages/40/20/f148f999a9b139b9d2ae02f93328bf0060f1642395811b0eb13c70c2df73/pyqwest-0.1.0-pp311-pypy311_pp73-macosx_11_0_arm64.whl", hash = "sha256:b59e0435680c8192e9d54fb578f94cad98c744594343e6b5d73ec4be3cb4f2f7", size = 4641938, upload-time = "2026-01-11T12:48:49.511Z" }, + { url = "https://files.pythonhosted.org/packages/97/55/935e525c3a5f74dbe704e1a232fa6804d5a861d5f72333e2ec1d4a9cf70a/pyqwest-0.1.0-pp311-pypy311_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:8bd2102ce661d7bc06b4e3c3653ee9ed4e45920625a3e2f66a661a547f264012", size = 4952562, upload-time = "2026-01-11T12:48:50.963Z" }, + { url = "https://files.pythonhosted.org/packages/76/cc/8be13bf27d0736b95f0b33cc1b5a81d86eae20a49d3eb0f5a40fd74e5168/pyqwest-0.1.0-pp311-pypy311_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bd4307c832572098e520d1d2370aa55eb16e0a03197c5803e7683e875415999e", size = 4968141, upload-time = "2026-01-11T12:48:52.428Z" }, + { url = "https://files.pythonhosted.org/packages/4c/33/4c3c787ab22f2fd2c194641932799361b0f94306c593a1deecb79e89cbd1/pyqwest-0.1.0-pp311-pypy311_pp73-musllinux_1_2_aarch64.whl", hash = "sha256:b88e4a13c5e49499687a1e5e19ce1c5c3a8160b4558d10db23099b72c11ad47b", size = 5139146, upload-time = "2026-01-11T12:48:54.023Z" }, + { url = "https://files.pythonhosted.org/packages/e0/0b/a501ffb7796fbe1bb1e0230cecfb6d2a70327e0fbaf9e464d555205e7554/pyqwest-0.1.0-pp311-pypy311_pp73-musllinux_1_2_x86_64.whl", hash = "sha256:acce5ee2eb1e8b875b5fd3037f290152565884c999c969ef1403f0ccf6ba85fe", size = 5277425, upload-time = "2026-01-11T12:48:56.274Z" }, + { url = "https://files.pythonhosted.org/packages/3b/2a/d44c6ea9fbebb160ba395cfabfd627b7362f56381286e4383b669e9b9902/pyqwest-0.1.0-pp311-pypy311_pp73-win_amd64.whl", hash = "sha256:494720404202173bb7a12df05fbbf3a63d7a7843a9bd6895cf62bcd82680a0d6", size = 4139236, upload-time = "2026-01-11T12:48:58.131Z" }, +] + [[package]] name = "pyright" version = "1.1.405"