From e4a1b4b489f168c8ff571ae7b4b7221e69cfafa2 Mon Sep 17 00:00:00 2001 From: tsubakiky Date: Fri, 11 Jul 2025 01:20:32 +0900 Subject: [PATCH 1/2] debug --- conformance/client_runner.py | 29 ++++++++- conformance/run-testcase.txt | 1 + spec/conformance-error.txt | 62 +++++++++++++++++++ src/connect/client.py | 6 +- src/connect/connect.py | 37 ++++++++--- .../protocol_connect/connect_client.py | 26 ++++++-- src/connect/protocol_grpc/grpc_client.py | 55 +++++++++++----- src/connect/protocol_grpc/unmarshaler.py | 6 +- 8 files changed, 188 insertions(+), 34 deletions(-) create mode 100644 conformance/run-testcase.txt create mode 100644 spec/conformance-error.txt diff --git a/conformance/client_runner.py b/conformance/client_runner.py index 3b43719..eb3e8e9 100755 --- a/conformance/client_runner.py +++ b/conformance/client_runner.py @@ -254,6 +254,9 @@ async def delayed_abort() -> None: UnaryRequest( content=req, headers=headers, + metadata={ + "test_name": msg.test_name, + }, ), CallOptions( timeout=msg.timeout_ms / 1000, @@ -279,6 +282,7 @@ async def _reqs() -> AsyncGenerator[service_pb2.ClientStreamRequest]: async for req in reqs: if msg.request_delay_ms > 0: await asyncio.sleep(msg.request_delay_ms / 1000) + print(f"[{msg.test_name}] Sending request", file=sys.stderr) yield req if msg.cancel.HasField("before_close_send"): @@ -293,7 +297,13 @@ async def delayed_abort() -> None: asyncio.create_task(delayed_abort()) async with getattr(client, msg.method)( - StreamRequest(content=_reqs(), headers=headers), + StreamRequest( + content=_reqs(), + headers=headers, + metadata={ + "test_name": msg.test_name, + }, + ), CallOptions( timeout=msg.timeout_ms / 1000, abort_event=abort_event, @@ -319,7 +329,13 @@ async def delayed_abort() -> None: headers = to_connect_headers(msg.request_headers) async with getattr(client, msg.method)( - StreamRequest(content=reqs, headers=headers), + StreamRequest( + content=reqs, + headers=headers, + metadata={ + "test_name": msg.test_name, + }, + ), CallOptions( timeout=msg.timeout_ms / 1000, abort_event=abort_event, @@ -358,12 +374,19 @@ async def _reqs() -> AsyncGenerator[service_pb2.ClientStreamRequest]: async for req in reqs: if msg.request_delay_ms > 0: await asyncio.sleep(msg.request_delay_ms / 1000) + print(f"[{msg.test_name}] Sending request", file=sys.stderr) yield req headers = to_connect_headers(msg.request_headers) async with getattr(client, msg.method)( - StreamRequest(content=_reqs(), headers=headers), + StreamRequest( + content=_reqs(), + headers=headers, + metadata={ + "test_name": msg.test_name, + }, + ), CallOptions( timeout=msg.timeout_ms / 1000, abort_event=abort_event, diff --git a/conformance/run-testcase.txt b/conformance/run-testcase.txt new file mode 100644 index 0000000..dddd62e --- /dev/null +++ b/conformance/run-testcase.txt @@ -0,0 +1 @@ +Errors/HTTPVersion:1/Protocol:PROTOCOL_GRPC_WEB/Codec:CODEC_PROTO/Compression:COMPRESSION_GZIP/TLS:false/(grpc server impl)/bidi-stream/half-duplex/error-with-no-responses diff --git a/spec/conformance-error.txt b/spec/conformance-error.txt new file mode 100644 index 0000000..e52b7a4 --- /dev/null +++ b/spec/conformance-error.txt @@ -0,0 +1,62 @@ +``` +cd conformance +❯ connectconformance -vv --trace --conf ./client_config.yaml --mode client -- uv run python client_runner.py +``` + +# testcase +- request: + testName: bidi-stream/half-duplex/error-with-no-responses + streamType: STREAM_TYPE_HALF_DUPLEX_BIDI_STREAM + requestHeaders: + - name: X-Conformance-Test + value: ["Value1","Value2"] + requestMessages: + - "@type": type.googleapis.com/connectrpc.conformance.v1.BidiStreamRequest + responseDefinition: + responseHeaders: + - name: x-custom-header + value: ["foo"] + error: + code: CODE_INTERNAL + message: "bidi half duplex stream failed" + responseTrailers: + - name: x-custom-trailer + value: ["bing"] + - "@type": type.googleapis.com/connectrpc.conformance.v1.BidiStreamRequest + requestData: "dGVzdCByZXNwb25zZQ==" + +# flaky failed +FAILED: Errors/HTTPVersion:1/Protocol:PROTOCOL_GRPC_WEB/Codec:CODEC_PROTO/Compression:COMPRESSION_GZIP/TLS:false/(grpc server impl)/bidi-stream/half-duplex/error-with-no-responses: + actual error {code: 14 (unavailable), message: "http: invalid Read on closed Body"} does not match expected code 13 (internal) + actual error {code: 14 (unavailable), message: "http: invalid Read on closed Body"} does not match expected message "bidi half duplex stream failed" + actual error contain 0 details; expecting 1 +---- HTTP Trace ---- + request> 0.000ms POST http://.../connectrpc.conformance.v1.ConformanceService/BidiStream HTTP/1.1 + request> Accept-Encoding: identity + request> Content-Type: application/grpc-web+proto + request> Grpc-Accept-Encoding: gzip + request> Grpc-Encoding: gzip + request> User-Agent: connect-python/0.0.1 (Python/3.13) + request> X-Conformance-Test: Value1, Value2 + request> X-Test-Case-Name: Errors/HTTPVersion:1/Protocol:PROTOCOL_GRPC_WEB/Codec:CODEC_PROTO/Compression:COMPRESSION_GZIP/TLS:false/(grpc server impl)/bidi-stream/half-duplex/error-with-no-responses + request> X-User-Agent: connect-python/0.0.1 (Python/3.13) + request> + request> 1.545ms message #1: prefix: flags=1, len=99 + request> message #1: data: 99/99 bytes +response< 1.671ms 200 OK +response< Access-Control-Expose-Headers: Server, X-Custom-Header, Vary, Date, Content-Type, Grpc-Encoding, grpc-status, grpc-message +response< Content-Type: application/grpc-web+proto +response< Grpc-Encoding: gzip +response< Server: connectconformance-grpcserver/v1.0.4 +response< Vary: Origin +response< X-Custom-Header: foo +response< + request> 2.714ms message #2: prefix: flags=1, len=35 + request> message #2: data: 35/35 bytes + request> 3.677ms body end (err=http: invalid Read on closed Body) +-------------------- + +Total cases: 4282 +4279 passed, 2 failed +(Another 1 failed as expected due to being known failures/flakes.) +Error: Process completed with exit code 1. diff --git a/src/connect/client.py b/src/connect/client.py index 11b53df..b0899ef 100644 --- a/src/connect/client.py +++ b/src/connect/client.py @@ -284,9 +284,11 @@ def on_request_send(r: httpcore.Request) -> None: conn.on_request_send(on_request_send) - await conn.send(request.messages, call_options.timeout, call_options.abort_event) + await conn.send(request.messages, call_options.timeout, call_options.abort_event, request.matadata) - response = await receive_stream_response(conn, output, request.spec, call_options.abort_event) + response = await receive_stream_response( + conn, output, request.spec, call_options.abort_event, request.matadata + ) return response stream_func = apply_interceptors(_stream_func, options.interceptors) diff --git a/src/connect/connect.py b/src/connect/connect.py index ec6b6c3..69da6aa 100644 --- a/src/connect/connect.py +++ b/src/connect/connect.py @@ -101,6 +101,7 @@ class RequestCommon: _peer: Peer _headers: Headers _method: str + matadata: dict[str, Any] | None = None def __init__( self, @@ -108,6 +109,7 @@ def __init__( peer: Peer | None = None, headers: Headers | None = None, method: str | None = None, + matadata: dict[str, Any] | None = None, ) -> None: """Initializes the RPC context. @@ -134,6 +136,7 @@ def __init__( self._peer = peer if peer else Peer(address=None, protocol="", query={}) self._headers = headers if headers is not None else Headers() self._method = method if method else HTTPMethod.POST.value + self.matadata = matadata if matadata is not None else {} @property def spec(self) -> Spec: @@ -229,6 +232,7 @@ def __init__( peer: Peer | None = None, headers: Headers | None = None, method: str | None = None, + metadata: dict[str, Any] | None = None, ) -> None: """Initializes a new request. @@ -240,7 +244,13 @@ def __init__( headers: The headers associated with the request. Defaults to None. method: The method name for the request. Defaults to None. """ - super().__init__(spec, peer, headers, method) + super().__init__( + spec, + peer, + headers, + method, + metadata, + ) self._messages = content if isinstance(content, AsyncIterable) else aiterate([content]) @property @@ -298,6 +308,7 @@ def __init__( peer: Peer | None = None, headers: Headers | None = None, method: str | None = None, + metadata: dict[str, Any] | None = None, ) -> None: """Initializes the request object. @@ -308,7 +319,7 @@ def __init__( headers (Headers | None, optional): The request headers. Defaults to None. method (str | None, optional): The request method. Defaults to None. """ - super().__init__(spec, peer, headers, method) + super().__init__(spec, peer, headers, method, metadata) self._message = content @property @@ -874,7 +885,9 @@ def peer(self) -> Peer: raise NotImplementedError() @abc.abstractmethod - def receive(self, message: Any, abort_event: asyncio.Event | None) -> AsyncIterator[Any]: + def receive( + self, message: Any, abort_event: asyncio.Event | None, metadata: dict[str, Any] | None = None + ) -> AsyncIterator[Any]: """Asynchronously receives a stream of messages. This method sends an initial message and then listens for a stream of @@ -912,7 +925,11 @@ def request_headers(self) -> Headers: @abc.abstractmethod async def send( - self, messages: AsyncIterable[Any], timeout: float | None, abort_event: asyncio.Event | None + self, + messages: AsyncIterable[Any], + timeout: float | None, + abort_event: asyncio.Event | None, + metadata: dict[str, Any] | None = None, ) -> None: """Asynchronously sends a stream of messages. @@ -1096,7 +1113,11 @@ async def receive_unary_response[T]( async def receive_stream_response[T]( - conn: StreamingClientConn, t: type[T], spec: Spec, abort_event: asyncio.Event | None + conn: StreamingClientConn, + t: type[T], + spec: Spec, + abort_event: asyncio.Event | None, + metadata: dict[str, Any] | None = None, ) -> StreamResponse[T]: """Receives a streaming response from the server. @@ -1116,12 +1137,14 @@ async def receive_stream_response[T]( headers, and trailers. """ if spec.stream_type == StreamType.ClientStream: - single_message = await ensure_single(conn.receive(t, abort_event)) + single_message = await ensure_single(conn.receive(t, abort_event, metadata)) return StreamResponse( AsyncDataStream[T](aiterate([single_message]), conn.aclose), conn.response_headers, conn.response_trailers ) else: return StreamResponse( - AsyncDataStream[T](conn.receive(t, abort_event), conn.aclose), conn.response_headers, conn.response_trailers + AsyncDataStream[T](conn.receive(t, abort_event, metadata), conn.aclose), + conn.response_headers, + conn.response_trailers, ) diff --git a/src/connect/protocol_connect/connect_client.py b/src/connect/protocol_connect/connect_client.py index 22ec274..8292ab9 100644 --- a/src/connect/protocol_connect/connect_client.py +++ b/src/connect/protocol_connect/connect_client.py @@ -331,7 +331,12 @@ async def _receive_messages(self, message: Any) -> AsyncIterator[Any]: obj = await self.unmarshaler.unmarshal(message) yield obj - def receive(self, message: Any, _abort_event: asyncio.Event | None) -> AsyncIterator[Any]: + def receive( + self, + message: Any, + _abort_event: asyncio.Event | None, + metadata: dict[str, Any] | None = None, + ) -> AsyncIterator[Any]: """Receives messages asynchronously based on the provided input message. Args: @@ -367,7 +372,11 @@ def on_request_send(self, fn: EventHook) -> None: self._event_hooks["request"].append(fn) async def send( - self, messages: AsyncIterable[Any], timeout: float | None, abort_event: asyncio.Event | None + self, + messages: AsyncIterable[Any], + timeout: float | None, + abort_event: asyncio.Event | None, + metadata: dict[str, Any] | None = None, ) -> None: """Sends a single message asynchronously using either HTTP GET or POST, with optional timeout and abort support. @@ -708,7 +717,12 @@ def on_request_send(self, fn: EventHook) -> None: """ self._event_hooks["request"].append(fn) - async def receive(self, message: Any, abort_event: asyncio.Event | None = None) -> AsyncIterator[Any]: + async def receive( + self, + message: Any, + abort_event: asyncio.Event | None = None, + metadata: dict[str, Any] | None = None, + ) -> AsyncIterator[Any]: """Asynchronously receives and yields messages from the unmarshaler, handling stream control and errors. Args: @@ -757,7 +771,11 @@ async def receive(self, message: Any, abort_event: asyncio.Event | None = None) raise ConnectError("missing end stream message", Code.INVALID_ARGUMENT) async def send( - self, messages: AsyncIterable[Any], timeout: float | None, abort_event: asyncio.Event | None + self, + messages: AsyncIterable[Any], + timeout: float | None, + abort_event: asyncio.Event | None, + metadata: dict[str, Any] | None = None, ) -> None: """Sends a stream of messages asynchronously to the server using HTTP POST. diff --git a/src/connect/protocol_grpc/grpc_client.py b/src/connect/protocol_grpc/grpc_client.py index 6cb9115..581135a 100644 --- a/src/connect/protocol_grpc/grpc_client.py +++ b/src/connect/protocol_grpc/grpc_client.py @@ -3,6 +3,7 @@ import asyncio import contextlib import functools +import sys from collections.abc import AsyncIterable, AsyncIterator, Callable, Mapping from http import HTTPMethod from typing import Any @@ -271,28 +272,40 @@ def peer(self) -> Peer: """ return self._peer - async def receive(self, message: Any, abort_event: asyncio.Event | None) -> AsyncIterator[Any]: + async def receive( + self, message: Any, abort_event: asyncio.Event | None, metadata: dict[str, Any] | None = None + ) -> AsyncIterator[Any]: + import sys + + test_name = metadata.get("test_name", "unknown") if metadata else "unknown" """Receives a message and processes it.""" trailer_received = False - async for obj, end in self.unmarshaler.unmarshal(message): - if abort_event and abort_event.is_set(): - raise ConnectError("receive operation aborted", Code.CANCELED) + try: + async for obj, end in self.unmarshaler.unmarshal(message, metadata): + if abort_event and abort_event.is_set(): + raise ConnectError("receive operation aborted", Code.CANCELED) - if end: - if trailer_received: - raise ConnectError("received extra end stream trailer", Code.INVALID_ARGUMENT) + if end: + if trailer_received: + raise ConnectError("received extra end stream trailer", Code.INVALID_ARGUMENT) - trailer_received = True - if self.unmarshaler.web_trailers is None: - raise ConnectError("trailer not received", Code.INVALID_ARGUMENT) + trailer_received = True + if self.unmarshaler.web_trailers is None: + raise ConnectError("trailer not received", Code.INVALID_ARGUMENT) - continue + continue - if trailer_received: - raise ConnectError("protocol error: received extra message after trailer", Code.INVALID_ARGUMENT) + if trailer_received: + raise ConnectError("protocol error: received extra message after trailer", Code.INVALID_ARGUMENT) - yield obj + print(f"[{test_name}] Received message", file=sys.stderr) + yield obj + except Exception as exc: + print(f"[{test_name}] Exception during unmarshaling: {exc}", file=sys.stderr) + raise exc + + print(f"[{test_name}] Received end of stream", file=sys.stderr) if callable(self.receive_trailers): self.receive_trailers() @@ -303,11 +316,13 @@ async def receive(self, message: Any, abort_event: asyncio.Event | None) -> Asyn del self._response_headers[HEADER_CONTENT_TYPE] server_error = grpc_error_from_trailer(self.response_trailers) + print(f"[{test_name}] 1 server_error: {server_error}", file=sys.stderr) if server_error: server_error.metadata = self.response_headers.copy() raise server_error server_error = grpc_error_from_trailer(self.response_trailers) + print(f"[{test_name}] 2 server_error: {server_error}", file=sys.stderr) if server_error: server_error.metadata = self.response_headers.copy() server_error.metadata.update(self.response_trailers.copy()) @@ -336,7 +351,11 @@ def request_headers(self) -> Headers: return self._request_headers async def send( - self, messages: AsyncIterable[Any], timeout: float | None, abort_event: asyncio.Event | None + self, + messages: AsyncIterable[Any], + timeout: float | None, + abort_event: asyncio.Event | None, + metadata: dict[str, Any] | None = None, ) -> None: """Sends a gRPC request asynchronously using HTTP/2 via httpcore. @@ -416,11 +435,13 @@ async def send( self.unmarshaler.stream = BoundAsyncStream(response.stream) self.receive_trailers = functools.partial(self._receive_trailers, response) - await self._validate_response(response) + await self._validate_response(response, metadata) - async def _validate_response(self, response: httpcore.Response) -> None: + async def _validate_response(self, response: httpcore.Response, metadata: dict[str, Any] | None = None) -> None: + test_name = metadata.get("test_name", "unknown") if metadata else "unknown" response_headers = Headers(response.headers) if response.status != 200: + print(f"[{test_name}] Response status: {response.status}", file=sys.stderr) raise ConnectError( f"HTTP {response.status}", code_from_http_status(response.status), diff --git a/src/connect/protocol_grpc/unmarshaler.py b/src/connect/protocol_grpc/unmarshaler.py index f92bf7b..48a4cb5 100644 --- a/src/connect/protocol_grpc/unmarshaler.py +++ b/src/connect/protocol_grpc/unmarshaler.py @@ -60,7 +60,7 @@ def __init__( self.web = web self._web_trailers = None - async def unmarshal(self, message: Any) -> AsyncIterator[tuple[Any, bool]]: + async def unmarshal(self, message: Any, metadata: dict[str, Any] | None = None) -> AsyncIterator[tuple[Any, bool]]: """Asynchronously unmarshals a message and yields objects along with an end flag. Iterates over the result of the superclass's `unmarshal` method, processing each object and its corresponding end flag. @@ -76,6 +76,7 @@ async def unmarshal(self, message: Any) -> AsyncIterator[tuple[Any, bool]]: Raises: ConnectError: If the envelope is empty or has invalid flags. """ + test_name = metadata.get("test_name", "unknown") if metadata else "unknown" async for obj, end in super().unmarshal(message): if end: env = self.last @@ -104,6 +105,9 @@ async def unmarshal(self, message: Any) -> AsyncIterator[tuple[Any, bool]]: else: trailers[name] = value + import sys + + print(f"[{test_name}] Received trailers", file=sys.stderr) self._web_trailers = trailers yield obj, end From c2550faff36ba0b87012f3ce17fa3ff0b04fe9a2 Mon Sep 17 00:00:00 2001 From: tsubakiky Date: Fri, 11 Jul 2025 01:36:12 +0900 Subject: [PATCH 2/2] github: run conformance 10 times --- .github/workflows/conformance.yaml | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/.github/workflows/conformance.yaml b/.github/workflows/conformance.yaml index b53a11e..665abe8 100644 --- a/.github/workflows/conformance.yaml +++ b/.github/workflows/conformance.yaml @@ -59,6 +59,7 @@ jobs: client: runs-on: ubuntu-2404-4-cores + timeout-minutes: 30 steps: # https://github.com/actions/checkout @@ -93,7 +94,10 @@ jobs: run: | go install -v connectrpc.com/conformance/cmd/connectconformance@latest - - name: Run the connectconformance for client + - name: Run the connectconformance for client (10 times) run: | - connectconformance --trace --conf ./client_config.yaml --mode client --known-flaky @./client_known_flaky.yaml -- uv run python client_runner.py + for i in {1..10}; do + echo "Running conformance test iteration $i" + connectconformance --trace --conf ./client_config.yaml --mode client --known-flaky @./client_known_flaky.yaml -- uv run python client_runner.py + done