From bf2882e1d592ec9d4b65bcd2b00fb80fef9fcea0 Mon Sep 17 00:00:00 2001 From: Luke Van Drie Date: Sat, 6 Dec 2025 02:03:49 +0000 Subject: [PATCH 1/3] test: add streaming DSL and fix receive race Introduces a robust testing DSL for gRPC streaming scenarios and fixes a concurrency bug in the legacy test utilities. Key Improvements: - **DSL:** Adds high-level request builders (`ReqLLM`, `ReqRaw`) to create readable, intent-based test cases. - **Race Fix:** Refactors `StreamedRequest` to use a serial read loop. Previously, concurrent `Recv()` calls caused undefined behavior that masked server responses. - **Robustness:** Updates `GetFreePort` to prevent race conditions during parallel test execution. **Note on EPP Tests:** The race condition fix in `StreamedRequest` stabilized the test output, revealing an incorrect assertion in `test/integration/epp/hermetic_test.go`. The server correctly returns a generic 400 error without echoing the invalid body (security best practice), but the test expected the body to be echo'd. The assertion has been updated to match the correct, stable server behavior. --- test/integration/epp/hermetic_test.go | 4 +- test/integration/util.go | 386 ++++++++++++++++++-------- 2 files changed, 278 insertions(+), 112 deletions(-) diff --git a/test/integration/epp/hermetic_test.go b/test/integration/epp/hermetic_test.go index bfb7f99f5..2bc7ba55d 100644 --- a/test/integration/epp/hermetic_test.go +++ b/test/integration/epp/hermetic_test.go @@ -228,7 +228,7 @@ func TestFullDuplexStreamed_KubeInferenceObjectiveRequest(t *testing.T) { wantErr: false, wantResponses: integrationutils.NewImmediateErrorResponse( envoyTypePb.StatusCode_BadRequest, - "inference gateway: BadRequest - Error unmarshaling request body: no healthy upstream", + "inference gateway: BadRequest - Error unmarshaling request body", ), }, { @@ -804,7 +804,7 @@ func TestFullDuplexStreamed_KubeInferenceObjectiveRequest(t *testing.T) { }, }, }, - wantResponses: []*extProcPb.ProcessingResponse{}, + wantResponses: nil, pods: newPodStates( podState{index: 0, queueSize: 4, kvCacheUsage: 0.2, activeModels: []string{"foo", "bar", modelSheddableTarget}}, ), diff --git a/test/integration/util.go b/test/integration/util.go index 9f0dcde7b..8b17f38e4 100644 --- a/test/integration/util.go +++ b/test/integration/util.go @@ -14,11 +14,21 @@ See the License for the specific language governing permissions and limitations under the License. */ +// Package integration provides shared utilities, request builders, and assertions for the hermetic integration test +// suites of the Gateway API Inference Extension. +// +// It encapsulates the complexity of constructing Envoy ext_proc Protobuf messages and managing gRPC streams, allowing +// individual test suites (e.g., test/integration/epp, test/integration/bbr) to focus on behavioral assertions rather +// than protocol boilerplate. package integration import ( + "context" "encoding/json" + "errors" + "fmt" "io" + "net" "strconv" "testing" "time" @@ -30,77 +40,89 @@ import ( "google.golang.org/protobuf/types/known/structpb" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metadata" - logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" requtil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/request" ) -const ( - headerKeyContentLength = "Content-Length" -) +// --- Request Builders (High-Level DSL) --- -func SendRequest(t *testing.T, client extProcPb.ExternalProcessor_ProcessClient, req *extProcPb.ProcessingRequest) (*extProcPb.ProcessingResponse, error) { - t.Logf("Sending request: %v", req) - if err := client.Send(req); err != nil { - t.Logf("Failed to send request %+v: %v", req, err) - return nil, err - } +// ReqLLM creates a sequence of gRPC messages representing a standard, streamed LLM inference request. +// It generates: +// 1. A RequestHeaders message containing standard inference headers (Objective, Model Rewrite, Request ID). +// 2. A RequestBody message containing the JSON payload with EndOfStream=true. +// +// Use this for the majority of "Happy Path" EPP and BBR streaming tests. +func ReqLLM(logger logr.Logger, prompt, model, targetModel string) []*extProcPb.ProcessingRequest { + return GenerateStreamedRequestSet(logger, prompt, model, targetModel, nil) +} - res, err := client.Recv() - if err != nil { - t.Logf("Failed to receive: %v", err) - return nil, err - } - t.Logf("Received response %+v", res) - return res, err +// ReqLLMUnary creates a single `ProcessingRequest` containing a complete JSON body. +// This simulates a scenario where Envoy has buffered the request body before sending it to the external processor +// (unary mode). +// +// Use this for tests where `streaming: false` or when testing legacy buffered behavior. +func ReqLLMUnary(logger logr.Logger, prompt, model string) *extProcPb.ProcessingRequest { + return GenerateRequest(logger, prompt, model, nil) } -// StreamedRequest sends a series of requests and collects the specified number of responses. -func StreamedRequest( - t *testing.T, - client extProcPb.ExternalProcessor_ProcessClient, - requests []*extProcPb.ProcessingRequest, - expectedResponses int, -) ([]*extProcPb.ProcessingResponse, error) { - for _, req := range requests { - t.Logf("Sending request: %v", req) - if err := client.Send(req); err != nil { - t.Logf("Failed to send request %+v: %v", req, err) - return nil, err - } - } +// ReqRaw creates a custom sequence of gRPC messages with specific headers and arbitrary body chunks. +// This is a lower-level helper useful for testing edge cases, such as: +// - Invalid JSON bodies (to test error handling). +// - Fragmentation (split bodies) to ensure the processor handles accumulation correctly. +// - Protocol attacks (e.g., missing headers). +func ReqRaw(headers map[string]string, bodyChunks ...string) []*extProcPb.ProcessingRequest { + reqs := []*extProcPb.ProcessingRequest{} - responses := []*extProcPb.ProcessingResponse{} - for i := range expectedResponses { - type recvResult struct { - res *extProcPb.ProcessingResponse - err error - } - recvChan := make(chan recvResult, 1) + // 1. Headers Phase + hList := []*envoyCorev3.HeaderValue{} + for k, v := range headers { + hList = append(hList, &envoyCorev3.HeaderValue{Key: k, Value: v}) + } + reqs = append(reqs, &extProcPb.ProcessingRequest{ + Request: &extProcPb.ProcessingRequest_RequestHeaders{ + RequestHeaders: &extProcPb.HttpHeaders{ + Headers: &envoyCorev3.HeaderMap{Headers: hList}, + }, + }, + }) - go func() { - res, err := client.Recv() - recvChan <- recvResult{res, err} - }() + // 2. Body Phase (Chunks) + for i, chunk := range bodyChunks { + reqs = append(reqs, &extProcPb.ProcessingRequest{ + Request: &extProcPb.ProcessingRequest_RequestBody{ + RequestBody: &extProcPb.HttpBody{ + Body: []byte(chunk), + EndOfStream: i == len(bodyChunks)-1, + }, + }, + }) + } + return reqs +} - select { - case <-time.After(10 * time.Second): - t.Logf("Timeout waiting for response %d of %d", i+1, expectedResponses) - return responses, nil - case result := <-recvChan: - if result.err != nil { - if result.err == io.EOF { - return responses, nil - } - t.Logf("Failed to receive: %v", result.err) - return nil, result.err - } - t.Logf("Received response %+v", result.res) - responses = append(responses, result.res) - } +// ReqHeaderOnly creates a request sequence consisting solely of headers, with no body. +// It sets `EndOfStream: true` on the headers frame. +// +// Use this for testing non-inference traffic, such as GET requests, health checks, or requests that should bypass the +// inference processor logic. +func ReqHeaderOnly(headers map[string]string) []*extProcPb.ProcessingRequest { + hList := []*envoyCorev3.HeaderValue{} + for k, v := range headers { + hList = append(hList, &envoyCorev3.HeaderValue{Key: k, Value: v}) } - return responses, nil + return []*extProcPb.ProcessingRequest{{ + Request: &extProcPb.ProcessingRequest_RequestHeaders{ + RequestHeaders: &extProcPb.HttpHeaders{ + Headers: &envoyCorev3.HeaderMap{Headers: hList}, + EndOfStream: true, + }, + }, + }} } +// --- Request Builders (Low-Level Generators) --- + +// GenerateRequest constructs a `ProcessingRequest` containing a JSON-formatted LLM payload. +// It accepts a filterMetadata slice to inject Envoy Dynamic Metadata (used for subset load balancing). func GenerateRequest(logger logr.Logger, prompt, model string, filterMetadata []string) *extProcPb.ProcessingRequest { j := map[string]any{ "prompt": prompt, @@ -111,11 +133,13 @@ func GenerateRequest(logger logr.Logger, prompt, model string, filterMetadata [] j["model"] = model } + // Panic on marshal failure is acceptable in test helpers as it implies a bug in the test code itself. llmReq, err := json.Marshal(j) if err != nil { - logutil.Fatal(logger, err, "Failed to unmarshal LLM request") + panic(fmt.Errorf("failed to marshal LLM request: %w", err)) } - req := &extProcPb.ProcessingRequest{ + + return &extProcPb.ProcessingRequest{ Request: &extProcPb.ProcessingRequest_RequestBody{ RequestBody: &extProcPb.HttpBody{Body: llmReq, EndOfStream: true}, }, @@ -123,47 +147,49 @@ func GenerateRequest(logger logr.Logger, prompt, model string, filterMetadata [] FilterMetadata: GenerateRequestMetadata(filterMetadata), }, } - return req } -func GenerateStreamedRequestSet(logger logr.Logger, prompt, model, targetModel string, filterMetadata []string) []*extProcPb.ProcessingRequest { +// GenerateStreamedRequestSet creates a slice of requests simulating an Envoy stream: +// 1. A Headers frame with standard Inference Extension headers. +// 2. A Body frame with the JSON payload. +func GenerateStreamedRequestSet( + logger logr.Logger, + prompt, model, targetModel string, + filterMetadata []string, +) []*extProcPb.ProcessingRequest { requests := []*extProcPb.ProcessingRequest{} + + // Headers + headers := []*envoyCorev3.HeaderValue{ + {Key: "hi", Value: "mom"}, + {Key: requtil.RequestIdHeaderKey, Value: "test-request-id"}, + } + if model != "" { + headers = append(headers, &envoyCorev3.HeaderValue{Key: metadata.ObjectiveKey, Value: model}) + } + if targetModel != "" { + headers = append(headers, &envoyCorev3.HeaderValue{Key: metadata.ModelNameRewriteKey, Value: targetModel}) + } + headerReq := &extProcPb.ProcessingRequest{ Request: &extProcPb.ProcessingRequest_RequestHeaders{ RequestHeaders: &extProcPb.HttpHeaders{ - Headers: &envoyCorev3.HeaderMap{ - Headers: []*envoyCorev3.HeaderValue{ - { - Key: "hi", - Value: "mom", - }, - { - Key: metadata.ObjectiveKey, - Value: model, - }, - { - Key: metadata.ModelNameRewriteKey, - Value: targetModel, - }, - { - Key: requtil.RequestIdHeaderKey, - Value: "test-request-id", - }, - }, - }, + Headers: &envoyCorev3.HeaderMap{Headers: headers}, }, }, + MetadataContext: &envoyCorev3.Metadata{ + FilterMetadata: GenerateRequestMetadata(filterMetadata), + }, } - - headerReq.MetadataContext = &envoyCorev3.Metadata{ - FilterMetadata: GenerateRequestMetadata(filterMetadata), - } - requests = append(requests, headerReq) + + // Body requests = append(requests, GenerateRequest(logger, prompt, model, filterMetadata)) return requests } +// GenerateRequestMetadata constructs the Envoy Dynamic Metadata structure. +// This is primarily used to inject "envoy.lb" subset keys for testing logic that depends on specific backend subsets. func GenerateRequestMetadata(filterMetadata []string) map[string]*structpb.Struct { requestMetadata := make(map[string]*structpb.Struct) interfaceList := make([]any, len(filterMetadata)) @@ -179,10 +205,19 @@ func GenerateRequestMetadata(filterMetadata []string) map[string]*structpb.Struc return requestMetadata } -// NewRequestBufferedResponse creates a complete set of responses for the request phase. -// It modifies request headers (e.g., for routing) and replaces the entire request body. -// It returns a slice of two messages, representing the complete buffered action. -func NewRequestBufferedResponse(destinationEndpoint string, rewrittenBody string, otherHeaders ...*envoyCorev3.HeaderValueOption) []*extProcPb.ProcessingResponse { +// --- Response Builders --- + +// NewRequestBufferedResponse creates a complete set of responses for the Request phase. +// It simulates the EPP deciding to: +// 1. Modify headers (e.g., set destination endpoint). +// 2. Replace the entire request body (e.g., rewriting the model name). +// +// It returns two messages: one for the Header response and one for the Body response. +func NewRequestBufferedResponse( + destinationEndpoint string, + rewrittenBody string, + otherHeaders ...*envoyCorev3.HeaderValueOption, +) []*extProcPb.ProcessingResponse { setHeaders := []*envoyCorev3.HeaderValueOption{ { Header: &envoyCorev3.HeaderValue{ @@ -210,7 +245,7 @@ func NewRequestBufferedResponse(destinationEndpoint string, rewrittenBody string }, }, }, - DynamicMetadata: makeMetadata(destinationEndpoint), + DynamicMetadata: makeDestinationMetadata(destinationEndpoint), } bodyResponse := &extProcPb.ProcessingResponse{ @@ -233,18 +268,21 @@ func NewRequestBufferedResponse(destinationEndpoint string, rewrittenBody string return []*extProcPb.ProcessingResponse{headerResponse, bodyResponse} } -// NewResponseBufferedResponse creates a complete set of responses for the response phase. -// It modifies response headers and replaces the entire response body. -// It is used when the processor buffers the upstream response before sending its own. -func NewResponseBufferedResponse(rewrittenBody string, headersToSet ...*envoyCorev3.HeaderValueOption) []*extProcPb.ProcessingResponse { +// NewResponseBufferedResponse creates a complete set of responses for the Response phase. +// It simulates the EPP modifying the upstream response before sending it to the client. +// It returns a Header mutation message followed by a Body replacement message. +func NewResponseBufferedResponse( + rewrittenBody string, + headersToSet ...*envoyCorev3.HeaderValueOption, +) []*extProcPb.ProcessingResponse { return []*extProcPb.ProcessingResponse{ NewResponseHeaders(headersToSet...), NewResponseStreamChunk(rewrittenBody, true), } } -// NewResponseHeaders creates a single response message to modify the response headers. -// This is the first step in either a buffered or streaming response modification. +// NewResponseHeaders creates a single response message to modify response headers. +// Use this when testing header mutations without body changes, or as the first step in a streamed response test. func NewResponseHeaders(headersToSet ...*envoyCorev3.HeaderValueOption) *extProcPb.ProcessingResponse { return &extProcPb.ProcessingResponse{ Response: &extProcPb.ProcessingResponse_ResponseHeaders{ @@ -259,8 +297,8 @@ func NewResponseHeaders(headersToSet ...*envoyCorev3.HeaderValueOption) *extProc } } -// NewResponseStreamChunk creates a single response for one body chunk in a stream. -// This is used to test streaming behaviors like text/event-stream pass-through. +// NewResponseStreamChunk creates a single gRPC message representing one chunk of a streaming response. +// Use this to verify that EPP correctly passes through chunks (e.g., SSE events) or injects specific chunks. func NewResponseStreamChunk(body string, endOfStream bool) *extProcPb.ProcessingResponse { return &extProcPb.ProcessingResponse{ Response: &extProcPb.ProcessingResponse_ResponseBody{ @@ -280,24 +318,148 @@ func NewResponseStreamChunk(body string, endOfStream bool) *extProcPb.Processing } } -// NewImmediateErrorResponse creates an immediate response to terminate processing. -// This is used for errors like load shedding or bad requests. +// NewImmediateErrorResponse creates a response that immediately terminates the request with a specific HTTP status code +// and body. +// Use this for testing Load Shedding (503), Rate Limiting (429), or Bad Request (400) logic. func NewImmediateErrorResponse(code envoyTypePb.StatusCode, body string) []*extProcPb.ProcessingResponse { - response := &extProcPb.ProcessingResponse{ + return []*extProcPb.ProcessingResponse{{ Response: &extProcPb.ProcessingResponse_ImmediateResponse{ ImmediateResponse: &extProcPb.ImmediateResponse{ - Status: &envoyTypePb.HttpStatus{ - Code: code, - }, - Body: []byte(body), + Status: &envoyTypePb.HttpStatus{Code: code}, + Body: []byte(body), }, }, + }} +} + +// --- Execution Helpers --- + +// SendRequest is a helper for Unary (One-Shot) test scenarios. +// It sends a single request message and waits for exactly one response. +func SendRequest( + t *testing.T, + client extProcPb.ExternalProcessor_ProcessClient, + req *extProcPb.ProcessingRequest, +) (*extProcPb.ProcessingResponse, error) { + t.Helper() + t.Logf("Sending request: %v", req) + + if err := client.Send(req); err != nil { + t.Logf("Failed to send request: %v", err) + return nil, err + } + + res, err := client.Recv() + if err != nil { + t.Logf("Failed to receive response: %v", err) + return nil, err + } + t.Logf("Received response: %+v", res) + return res, err +} + +// StreamedRequest is a helper for Full-Duplex Streaming test scenarios. +// It performs the following actions: +// 1. Sends all requests in the provided slice to the server. +// 2. Listens for responses on the stream until 'expectedResponses' count is reached. +// 3. Enforces a 10-second timeout to prevent deadlocks if the server hangs. +// 4. Handles io.EOF gracefully (server closed stream). +func StreamedRequest( + t *testing.T, + client extProcPb.ExternalProcessor_ProcessClient, + requests []*extProcPb.ProcessingRequest, + expectedResponses int, +) ([]*extProcPb.ProcessingResponse, error) { + t.Helper() + + // 1. Send Phase + for _, req := range requests { + t.Logf("Sending request: %v", req) + if err := client.Send(req); err != nil { + t.Logf("Failed to send request: %v", err) + return nil, err + } + } + + // 2. Receive Phase + // We use a channel and a separate goroutine for receiving to allow for a strict timeout via select{}. + type recvResult struct { + res *extProcPb.ProcessingResponse + err error + } + + // Buffered channel avoids blocking the goroutine on the last read. + recvChan := make(chan recvResult, expectedResponses+1) + + // Start reading in background. + go func() { + for range expectedResponses { + res, err := client.Recv() + recvChan <- recvResult{res, err} + if err != nil { + return // Stop reading on error or EOF. + } + } + }() + + var responses []*extProcPb.ProcessingResponse + + ctx, cancel := context.WithTimeout(t.Context(), 10*time.Second) + defer cancel() + + // Collect results with timeout. + for i := range expectedResponses { + select { + case <-ctx.Done(): + t.Logf("Timeout waiting for response %d of %d: %v", i+1, expectedResponses, ctx.Err()) + return responses, fmt.Errorf("timeout waiting for responses: %w", ctx.Err()) + + case result := <-recvChan: + if result.err != nil { + // io.EOF is a valid termination from the server side (e.g. rejection). + if result.err == io.EOF { + return responses, nil + } + t.Logf("Failed to receive: %v", result.err) + return nil, result.err + } + t.Logf("Received response: %+v", result.res) + responses = append(responses, result.res) + } + } + + return responses, nil +} + +// --- System Utilities --- + +// GetFreePort finds an available IPv4 TCP port on localhost. +// It works by asking the OS to allocate a port by listening on port 0, capturing the assigned address, and then +// immediately closing the listener. +// +// Note: There is a theoretical race condition where another process grabs the port between the Close() call and the +// subsequent usage, but this is generally acceptable in hermetic test environments. +func GetFreePort() (*net.TCPAddr, error) { + // Force IPv4 to prevent flakes on dual-stack CI environments + listener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + return nil, fmt.Errorf("failed to listen on a free port: %w", err) } - return []*extProcPb.ProcessingResponse{response} + + // Critical: Close the listener immediately so the caller can bind to it. + defer listener.Close() + + addr, ok := listener.Addr().(*net.TCPAddr) + if !ok { + return nil, errors.New("failed to cast listener address to TCPAddr") + } + return addr, nil } -// makeMetadata creates the dynamic metadata struct that Envoy uses for routing hints. -func makeMetadata(endpoint string) *structpb.Struct { +// --- Internal Helpers --- + +// makeDestinationMetadata helper to construct the Envoy dynamic metadata for routing. +func makeDestinationMetadata(endpoint string) *structpb.Struct { return &structpb.Struct{ Fields: map[string]*structpb.Value{ metadata.DestinationEndpointNamespace: { @@ -316,3 +478,7 @@ func makeMetadata(endpoint string) *structpb.Struct { }, } } + +const ( + headerKeyContentLength = "Content-Length" +) From 3b01d7f28f13d95a1493dff0611b62130e9c85ea Mon Sep 17 00:00:00 2001 From: Luke Van Drie Date: Sat, 6 Dec 2025 02:10:59 +0000 Subject: [PATCH 2/3] test/bbr: add isolated integration test harness Introduces the `BBRHarness` struct to manage isolated BBR server instances, laying the groundwork for parallel integration testing. Features: - **Isolation:** Spawns a dedicated server per test on a random loopback port. - **Helpers:** Provides BBR-specific assertion helpers (e.g., `ExpectBBRHeader`) to reduce protobuf boilerplate. - **Safety:** Ensures proper server shutdown and resource cleanup via `t.Cleanup`. --- test/integration/bbr/harness_test.go | 103 +++++++++++++++++++++++ test/integration/bbr/util_test.go | 121 +++++++++++++++++++++++++++ 2 files changed, 224 insertions(+) create mode 100644 test/integration/bbr/harness_test.go create mode 100644 test/integration/bbr/util_test.go diff --git a/test/integration/bbr/harness_test.go b/test/integration/bbr/harness_test.go new file mode 100644 index 000000000..468ce9a42 --- /dev/null +++ b/test/integration/bbr/harness_test.go @@ -0,0 +1,103 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package bbr + +import ( + "context" + "fmt" + "strings" + "testing" + + extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + runserver "sigs.k8s.io/gateway-api-inference-extension/pkg/bbr/server" + logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" + "sigs.k8s.io/gateway-api-inference-extension/test/integration" +) + +var logger = logutil.NewTestLogger().V(logutil.VERBOSE) + +// BBRHarness encapsulates the environment for a single isolated BBR test run. +type BBRHarness struct { + t *testing.T + ctx context.Context + Client extProcPb.ExternalProcessor_ProcessClient + + // Internal handles for cleanup + server *runserver.ExtProcServerRunner + grpcConn *grpc.ClientConn +} + +// NewBBRHarness boots up an isolated BBR server on a random port. +// streaming: determines if the BBR server runs in streaming mode or unary/buffered mode. +func NewBBRHarness(t *testing.T, ctx context.Context, streaming bool) *BBRHarness { + t.Helper() + + // 1. Allocate Free Port + tcpAddr, err := integration.GetFreePort() + require.NoError(t, err, "failed to acquire free port for BBR server") + port := tcpAddr.Port + + // 2. Configure BBR Server + // BBR is simpler than EPP; it doesn't need a K8s Manager. + runner := runserver.NewDefaultExtProcServerRunner(port, false) + runner.SecureServing = false + runner.Streaming = streaming + + // 3. Start Server in Background + serverCtx, serverCancel := context.WithCancel(ctx) + go func() { + logger.Info("Starting BBR server", "port", port, "streaming", streaming) + if err := runner.AsRunnable(logger.WithName("bbr-server")).Start(serverCtx); err != nil { + // Context cancellation is expected during teardown. + if !strings.Contains(err.Error(), "context canceled") { + logger.Error(err, "BBR server stopped unexpectedly") + } + } + }() + + // 4. Connect Client + // Blocking dial ensures the server is reachable before the test logic begins. + addr := fmt.Sprintf("127.0.0.1:%d", port) + conn, err := grpc.NewClient(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) + require.NoError(t, err, "failed to create grpc connection to BBR server") + + extProcClient, err := extProcPb.NewExternalProcessorClient(conn).Process(ctx) + require.NoError(t, err, "failed to initialize ext_proc stream client") + + h := &BBRHarness{ + t: t, + ctx: ctx, + Client: extProcClient, + server: runner, + grpcConn: conn, + } + + // 5. Register Cleanup + t.Cleanup(func() { + logger.Info("Tearing down BBR server", "port", port) + serverCancel() + if err := h.grpcConn.Close(); err != nil { + t.Logf("Warning: failed to close grpc connection: %v", err) + } + }) + + return h +} diff --git a/test/integration/bbr/util_test.go b/test/integration/bbr/util_test.go new file mode 100644 index 000000000..72fc6a574 --- /dev/null +++ b/test/integration/bbr/util_test.go @@ -0,0 +1,121 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package bbr + +import ( + "encoding/json" + + envoyCorev3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" +) + +// --- Response Expectations (Streaming) --- + +// ExpectBBRHeader asserts that BBR set the specific model header and cleared the route cache. +func ExpectBBRHeader(modelName string) *extProcPb.ProcessingResponse { + return &extProcPb.ProcessingResponse{ + Response: &extProcPb.ProcessingResponse_RequestHeaders{ + RequestHeaders: &extProcPb.HeadersResponse{ + Response: &extProcPb.CommonResponse{ + ClearRouteCache: true, + HeaderMutation: &extProcPb.HeaderMutation{ + SetHeaders: []*envoyCorev3.HeaderValueOption{ + { + Header: &envoyCorev3.HeaderValue{ + Key: "X-Gateway-Model-Name", + RawValue: []byte(modelName), + }, + }, + }, + }, + }, + }, + }, + } +} + +// ExpectBBRBodyPassThrough asserts that BBR reconstructs and passes the body through. +// BBR buffers the body to inspect it, then sends it downstream as a single chunk (usually). +func ExpectBBRBodyPassThrough(prompt, model string) *extProcPb.ProcessingResponse { + j := map[string]any{ + "max_tokens": 100, "prompt": prompt, "temperature": 0, + } + if model != "" { + j["model"] = model + } + b, _ := json.Marshal(j) + + return &extProcPb.ProcessingResponse{ + Response: &extProcPb.ProcessingResponse_RequestBody{ + RequestBody: &extProcPb.BodyResponse{ + Response: &extProcPb.CommonResponse{ + BodyMutation: &extProcPb.BodyMutation{ + Mutation: &extProcPb.BodyMutation_StreamedResponse{ + StreamedResponse: &extProcPb.StreamedBodyResponse{ + Body: b, + EndOfStream: true, + }, + }, + }, + }, + }, + }, + } +} + +// ExpectBBRNoOpHeader asserts that BBR did nothing to the headers (e.g., when no model is found). +func ExpectBBRNoOpHeader() *extProcPb.ProcessingResponse { + return &extProcPb.ProcessingResponse{ + Response: &extProcPb.ProcessingResponse_RequestHeaders{ + RequestHeaders: &extProcPb.HeadersResponse{}, + }, + } +} + +// --- Response Expectations (Unary) --- + +// ExpectBBRUnaryResponse creates expected response for unary tests where the body is mutated directly. +func ExpectBBRUnaryResponse(modelName string) *extProcPb.ProcessingResponse { + resp := &extProcPb.ProcessingResponse{} + + // If modelName is present, we expect header mutations. + if modelName != "" { + resp.Response = &extProcPb.ProcessingResponse_RequestBody{ + RequestBody: &extProcPb.BodyResponse{ + Response: &extProcPb.CommonResponse{ + ClearRouteCache: true, + HeaderMutation: &extProcPb.HeaderMutation{ + SetHeaders: []*envoyCorev3.HeaderValueOption{ + { + Header: &envoyCorev3.HeaderValue{ + Key: "X-Gateway-Model-Name", + RawValue: []byte(modelName), + }, + }, + }, + }, + }, + }, + } + } else { + // Otherwise, expect a No-Op on the body. + resp.Response = &extProcPb.ProcessingResponse_RequestBody{ + RequestBody: &extProcPb.BodyResponse{}, + } + } + return resp +} From 69160f8ee2aeac08e63d33f24b6af69f900cd7f2 Mon Sep 17 00:00:00 2001 From: Luke Van Drie Date: Sat, 6 Dec 2025 02:11:51 +0000 Subject: [PATCH 3/3] test/bbr: migrate to harness and enable parallel Refactors `hermetic_test.go` to use the new `BBRHarness` and DSL, reducing boilerplate and execution time. Changes: - Replaces manual gRPC setup with declarative, table-driven test cases. - Enables `t.Parallel()` for all BBR test cases, as the new harness provides full network isolation. - Removes legacy `setUpHermeticServer` helper in favor of the new harness. --- test/integration/bbr/hermetic_test.go | 285 ++++++-------------------- 1 file changed, 65 insertions(+), 220 deletions(-) diff --git a/test/integration/bbr/hermetic_test.go b/test/integration/bbr/hermetic_test.go index e1c25a78f..3edcb70f4 100644 --- a/test/integration/bbr/hermetic_test.go +++ b/test/integration/bbr/hermetic_test.go @@ -19,87 +19,67 @@ package bbr import ( "context" - "fmt" "testing" - "time" - configPb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" "github.com/google/go-cmp/cmp" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" + "github.com/stretchr/testify/require" "google.golang.org/protobuf/testing/protocmp" - - runserver "sigs.k8s.io/gateway-api-inference-extension/pkg/bbr/server" - logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" - integrationutils "sigs.k8s.io/gateway-api-inference-extension/test/integration" + "sigs.k8s.io/gateway-api-inference-extension/test/integration" ) -var logger = logutil.NewTestLogger().V(logutil.VERBOSE) - +// TestBodyBasedRouting validates the "Unary" (Non-Streaming) behavior of BBR. +// This simulates scenarios where Envoy buffers the body before sending it to ext_proc. func TestBodyBasedRouting(t *testing.T) { + t.Parallel() + tests := []struct { - name string - req *extProcPb.ProcessingRequest - wantHeaders []*configPb.HeaderValueOption - wantErr bool + name string + req *extProcPb.ProcessingRequest + wantResponse *extProcPb.ProcessingResponse + wantErr bool }{ { - name: "success adding model parameter to header", - req: integrationutils.GenerateRequest(logger, "test", "llama", nil), - wantHeaders: []*configPb.HeaderValueOption{ - { - Header: &configPb.HeaderValue{ - Key: "X-Gateway-Model-Name", - RawValue: []byte("llama"), - }, - }, - }, - wantErr: false, + name: "success: extracts model and sets header", + req: integration.ReqLLMUnary(logger, "test", "llama"), + wantResponse: ExpectBBRUnaryResponse("llama"), + wantErr: false, }, { - name: "no model parameter", - req: integrationutils.GenerateRequest(logger, "test1", "", nil), - wantHeaders: []*configPb.HeaderValueOption{}, - wantErr: false, + name: "noop: no model parameter in body", + req: integration.ReqLLMUnary(logger, "test1", ""), + wantResponse: ExpectBBRUnaryResponse(""), // Expect no headers. + wantErr: false, }, } - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - client, cleanup := setUpHermeticServer(false) - t.Cleanup(cleanup) + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + ctx := context.Background() + h := NewBBRHarness(t, ctx, false) + + res, err := integration.SendRequest(t, h.Client, tc.req) - want := &extProcPb.ProcessingResponse{} - if len(test.wantHeaders) > 0 { - want.Response = &extProcPb.ProcessingResponse_RequestBody{ - RequestBody: &extProcPb.BodyResponse{ - Response: &extProcPb.CommonResponse{ - HeaderMutation: &extProcPb.HeaderMutation{ - SetHeaders: test.wantHeaders, - }, - ClearRouteCache: true, - }, - }, - } + if tc.wantErr { + require.Error(t, err, "expected error during request processing") } else { - want.Response = &extProcPb.ProcessingResponse_RequestBody{ - RequestBody: &extProcPb.BodyResponse{}, - } + require.NoError(t, err, "unexpected error during request processing") } - res, err := integrationutils.SendRequest(t, client, test.req) - if err != nil && !test.wantErr { - t.Errorf("Unexpected error, got: %v, want error: %v", err, test.wantErr) - } - if diff := cmp.Diff(want, res, protocmp.Transform()); diff != "" { - t.Errorf("Unexpected response, (-want +got): %v", diff) + if diff := cmp.Diff(tc.wantResponse, res, protocmp.Transform()); diff != "" { + t.Errorf("Response mismatch (-want +got): %v", diff) } }) } } +// TestFullDuplexStreamed_BodyBasedRouting validates the "Streaming" behavior of BBR. +// This validates that BBR correctly buffers streamed chunks, inspects the body, and injects the header. func TestFullDuplexStreamed_BodyBasedRouting(t *testing.T) { + t.Parallel() + tests := []struct { name string reqs []*extProcPb.ProcessingRequest @@ -107,188 +87,53 @@ func TestFullDuplexStreamed_BodyBasedRouting(t *testing.T) { wantErr bool }{ { - name: "success adding model parameter to header", - reqs: integrationutils.GenerateStreamedRequestSet(logger, "test", "foo", "foo", nil), + name: "success: adds model header from simple body", + reqs: integration.ReqLLM(logger, "test", "foo", "bar"), wantResponses: []*extProcPb.ProcessingResponse{ - { - Response: &extProcPb.ProcessingResponse_RequestHeaders{ - RequestHeaders: &extProcPb.HeadersResponse{ - Response: &extProcPb.CommonResponse{ - ClearRouteCache: true, - HeaderMutation: &extProcPb.HeaderMutation{ - SetHeaders: []*configPb.HeaderValueOption{ - { - Header: &configPb.HeaderValue{ - Key: "X-Gateway-Model-Name", - RawValue: []byte("foo"), - }, - }, - }}, - }, - }, - }, - }, - { - Response: &extProcPb.ProcessingResponse_RequestBody{ - RequestBody: &extProcPb.BodyResponse{ - Response: &extProcPb.CommonResponse{ - BodyMutation: &extProcPb.BodyMutation{ - Mutation: &extProcPb.BodyMutation_StreamedResponse{ - StreamedResponse: &extProcPb.StreamedBodyResponse{ - Body: []byte("{\"max_tokens\":100,\"model\":\"foo\",\"prompt\":\"test\",\"temperature\":0}"), - EndOfStream: true, - }, - }, - }, - }, - }, - }, - }, + ExpectBBRHeader("foo"), + ExpectBBRBodyPassThrough("test", "foo"), }, }, { - name: "success adding model parameter to header with multiple body chunks", - reqs: []*extProcPb.ProcessingRequest{ - { - Request: &extProcPb.ProcessingRequest_RequestHeaders{ - RequestHeaders: &extProcPb.HttpHeaders{ - Headers: &configPb.HeaderMap{ - Headers: []*configPb.HeaderValue{ - { - Key: "hi", - Value: "mom", - }, - }, - }, - }, - }, - }, - { - Request: &extProcPb.ProcessingRequest_RequestBody{ - RequestBody: &extProcPb.HttpBody{Body: []byte("{\"max_tokens\":100,\"model\":\"sql-lo"), EndOfStream: false}, - }, - }, - { - Request: &extProcPb.ProcessingRequest_RequestBody{ - RequestBody: &extProcPb.HttpBody{Body: []byte("ra-sheddable\",\"prompt\":\"test\",\"temperature\":0}"), EndOfStream: true}, - }, - }, - }, + name: "success: buffers split chunks and extracts model", + reqs: integration.ReqRaw( + map[string]string{"hi": "mom"}, + `{"max_tokens":100,"model":"sql-lo`, + `ra-sheddable","prompt":"test","temperature":0}`, + ), wantResponses: []*extProcPb.ProcessingResponse{ - { - Response: &extProcPb.ProcessingResponse_RequestHeaders{ - RequestHeaders: &extProcPb.HeadersResponse{ - Response: &extProcPb.CommonResponse{ - ClearRouteCache: true, - HeaderMutation: &extProcPb.HeaderMutation{ - SetHeaders: []*configPb.HeaderValueOption{ - { - Header: &configPb.HeaderValue{ - Key: "X-Gateway-Model-Name", - RawValue: []byte("sql-lora-sheddable"), - }, - }, - }}, - }, - }, - }, - }, - { - Response: &extProcPb.ProcessingResponse_RequestBody{ - RequestBody: &extProcPb.BodyResponse{ - Response: &extProcPb.CommonResponse{ - BodyMutation: &extProcPb.BodyMutation{ - Mutation: &extProcPb.BodyMutation_StreamedResponse{ - StreamedResponse: &extProcPb.StreamedBodyResponse{ - Body: []byte("{\"max_tokens\":100,\"model\":\"sql-lora-sheddable\",\"prompt\":\"test\",\"temperature\":0}"), - EndOfStream: true, - }, - }, - }, - }, - }, - }, - }, + ExpectBBRHeader("sql-lora-sheddable"), + ExpectBBRBodyPassThrough("test", "sql-lora-sheddable"), }, }, { - name: "no model parameter", - reqs: integrationutils.GenerateStreamedRequestSet(logger, "test", "", "", nil), + name: "noop: handles missing model field gracefully", + reqs: integration.ReqLLM(logger, "test", "", ""), wantResponses: []*extProcPb.ProcessingResponse{ - { - Response: &extProcPb.ProcessingResponse_RequestHeaders{ - RequestHeaders: &extProcPb.HeadersResponse{}, - }, - }, - { - Response: &extProcPb.ProcessingResponse_RequestBody{ - RequestBody: &extProcPb.BodyResponse{ - Response: &extProcPb.CommonResponse{ - BodyMutation: &extProcPb.BodyMutation{ - Mutation: &extProcPb.BodyMutation_StreamedResponse{ - StreamedResponse: &extProcPb.StreamedBodyResponse{ - Body: []byte("{\"max_tokens\":100,\"prompt\":\"test\",\"temperature\":0}"), - EndOfStream: true, - }, - }, - }, - }, - }, - }, - }, + ExpectBBRNoOpHeader(), + ExpectBBRBodyPassThrough("test", ""), }, }, } - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - client, cleanup := setUpHermeticServer(true) - t.Cleanup(cleanup) + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + ctx := context.Background() + h := NewBBRHarness(t, ctx, true) + + responses, err := integration.StreamedRequest(t, h.Client, tc.reqs, len(tc.wantResponses)) - responses, err := integrationutils.StreamedRequest(t, client, test.reqs, len(test.wantResponses)) - if err != nil && !test.wantErr { - t.Errorf("Unexpected error, got: %v, want error: %v", err, test.wantErr) + if tc.wantErr { + require.Error(t, err, "expected stream error") + } else { + require.NoError(t, err, "unexpected stream error") } - if diff := cmp.Diff(test.wantResponses, responses, protocmp.Transform()); diff != "" { - t.Errorf("Unexpected response, (-want +got): %v", diff) + if diff := cmp.Diff(tc.wantResponses, responses, protocmp.Transform()); diff != "" { + t.Errorf("Response mismatch (-want +got): %v", diff) } }) } } - -func setUpHermeticServer(streaming bool) (client extProcPb.ExternalProcessor_ProcessClient, cleanup func()) { - port := 9004 - - serverCtx, stopServer := context.WithCancel(context.Background()) - serverRunner := runserver.NewDefaultExtProcServerRunner(port, false) - serverRunner.SecureServing = false - serverRunner.Streaming = streaming - - go func() { - if err := serverRunner.AsRunnable(logger.WithName("ext-proc")).Start(serverCtx); err != nil { - logutil.Fatal(logger, err, "Failed to start ext-proc server") - } - }() - - address := fmt.Sprintf("localhost:%v", port) - // Create a grpc connection - conn, err := grpc.NewClient(address, grpc.WithTransportCredentials(insecure.NewCredentials())) - if err != nil { - logutil.Fatal(logger, err, "Failed to connect", "address", address) - } - - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - client, err = extProcPb.NewExternalProcessorClient(conn).Process(ctx) - if err != nil { - logutil.Fatal(logger, err, "Failed to create client") - } - return client, func() { - cancel() - conn.Close() - stopServer() - - // wait a little until the goroutines actually exit - time.Sleep(5 * time.Second) - } -}