Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ check-api-key:
buildx-create:
docker buildx inspect $(BUILDX_BUILDER_NAME) 2>&1 > /dev/null || \
docker buildx create --name $(BUILDX_BUILDER_NAME) --platform linux/amd64,linux/arm64 --driver docker-container --use --driver-opt network=host || true
docker buildx use $(BUILDX_BUILDER_NAME) || true

.PHONY: build-all # for test purpose build all but output to /dev/null
build-all: BUILD_ARGS ?= --progress=plain --builder $(BUILDX_BUILDER_NAME) --platform linux/amd64,linux/arm64 --output type=tar,dest=/dev/null
Expand Down
142 changes: 127 additions & 15 deletions go/internal/controller/translator/agent/adk_api_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"errors"
"fmt"
"maps"
"net/url"
"os"
"slices"
"strconv"
Expand Down Expand Up @@ -73,18 +74,20 @@ type AdkApiTranslator interface {

type TranslatorPlugin = translator.TranslatorPlugin

func NewAdkApiTranslator(kube client.Client, defaultModelConfig types.NamespacedName, plugins []TranslatorPlugin) AdkApiTranslator {
func NewAdkApiTranslator(kube client.Client, defaultModelConfig types.NamespacedName, plugins []TranslatorPlugin, globalProxyURL string) AdkApiTranslator {
return &adkApiTranslator{
kube: kube,
defaultModelConfig: defaultModelConfig,
plugins: plugins,
globalProxyURL: globalProxyURL,
}
}

type adkApiTranslator struct {
kube client.Client
defaultModelConfig types.NamespacedName
plugins []TranslatorPlugin
globalProxyURL string
}

const MAX_DEPTH = 10
Expand Down Expand Up @@ -532,7 +535,8 @@ func (a *adkApiTranslator) translateInlineAgent(ctx context.Context, agent *v1al
// Skip tools that are not applicable to the model provider
switch {
case tool.McpServer != nil:
err := a.translateMCPServerTarget(ctx, cfg, agent.Namespace, tool.McpServer, tool.HeadersFrom)
// Use proxy for MCP server/tool communication
err := a.translateMCPServerTarget(ctx, cfg, agent.Namespace, tool.McpServer, tool.HeadersFrom, a.globalProxyURL)
if err != nil {
return nil, nil, nil, err
}
Expand All @@ -555,15 +559,36 @@ func (a *adkApiTranslator) translateInlineAgent(ctx context.Context, agent *v1al

switch toolAgent.Spec.Type {
case v1alpha2.AgentType_BYO, v1alpha2.AgentType_Declarative:
url := fmt.Sprintf("http://%s.%s:8080", toolAgent.Name, toolAgent.Namespace)
originalURL := fmt.Sprintf("http://%s.%s:8080", toolAgent.Name, toolAgent.Namespace)
headers, err := tool.ResolveHeaders(ctx, a.kube, agent.Namespace)
if err != nil {
return nil, nil, nil, err
}

// If proxy is configured, use proxy URL and set X-Host header for Gateway API routing
targetURL := originalURL
if a.globalProxyURL != "" {
// Parse original URL to extract path and hostname
originalURLParsed, err := url.Parse(originalURL)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to parse agent URL %q: %w", originalURL, err)
}
proxyURLParsed, err := url.Parse(a.globalProxyURL)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to parse proxy URL %q: %w", a.globalProxyURL, err)
}
// Use proxy URL with original path
targetURL = fmt.Sprintf("%s://%s%s", proxyURLParsed.Scheme, proxyURLParsed.Host, originalURLParsed.Path)
// Set X-Host header to original hostname (without port) for Gateway API routing
if headers == nil {
headers = make(map[string]string)
}
headers["X-Host"] = originalURLParsed.Hostname()
}

cfg.RemoteAgents = append(cfg.RemoteAgents, adk.RemoteAgentConfig{
Name: utils.ConvertToPythonIdentifier(utils.GetObjectRef(toolAgent)),
Url: url,
Url: targetURL,
Headers: headers,
Description: toolAgent.Spec.Description,
})
Expand Down Expand Up @@ -921,14 +946,35 @@ func (a *adkApiTranslator) translateModel(ctx context.Context, namespace, modelC
return nil, nil, nil, fmt.Errorf("unknown model provider: %s", model.Spec.Provider)
}

func (a *adkApiTranslator) translateStreamableHttpTool(ctx context.Context, tool *v1alpha2.RemoteMCPServerSpec, namespace string) (*adk.StreamableHTTPConnectionParams, error) {
func (a *adkApiTranslator) translateStreamableHttpTool(ctx context.Context, tool *v1alpha2.RemoteMCPServerSpec, namespace string, proxyURL string) (*adk.StreamableHTTPConnectionParams, error) {
headers, err := tool.ResolveHeaders(ctx, a.kube, namespace)
if err != nil {
return nil, err
}

// If proxy is configured, use proxy URL and set X-Host header for Gateway API routing
targetURL := tool.URL
if proxyURL != "" {
// Parse original URL to extract path and hostname
originalURL, err := url.Parse(tool.URL)
if err != nil {
return nil, fmt.Errorf("failed to parse tool URL %q: %w", tool.URL, err)
}
proxyURLParsed, err := url.Parse(proxyURL)
if err != nil {
return nil, fmt.Errorf("failed to parse proxy URL %q: %w", proxyURL, err)
}
// Use proxy URL with original path
targetURL = fmt.Sprintf("%s://%s%s", proxyURLParsed.Scheme, proxyURLParsed.Host, originalURL.Path)
// Set X-Host header to original hostname (without port) for Gateway API routing
if headers == nil {
headers = make(map[string]string)
}
headers["X-Host"] = originalURL.Hostname()
}

params := &adk.StreamableHTTPConnectionParams{
Url: tool.URL,
Url: targetURL,
Headers: headers,
}
if tool.Timeout != nil {
Expand All @@ -943,14 +989,35 @@ func (a *adkApiTranslator) translateStreamableHttpTool(ctx context.Context, tool
return params, nil
}

func (a *adkApiTranslator) translateSseHttpTool(ctx context.Context, tool *v1alpha2.RemoteMCPServerSpec, namespace string) (*adk.SseConnectionParams, error) {
func (a *adkApiTranslator) translateSseHttpTool(ctx context.Context, tool *v1alpha2.RemoteMCPServerSpec, namespace string, proxyURL string) (*adk.SseConnectionParams, error) {
headers, err := tool.ResolveHeaders(ctx, a.kube, namespace)
if err != nil {
return nil, err
}

// If proxy is configured, use proxy URL and set X-Host header for Gateway API routing
targetURL := tool.URL
if proxyURL != "" {
// Parse original URL to extract path and hostname
originalURL, err := url.Parse(tool.URL)
if err != nil {
return nil, fmt.Errorf("failed to parse tool URL %q: %w", tool.URL, err)
}
proxyURLParsed, err := url.Parse(proxyURL)
if err != nil {
return nil, fmt.Errorf("failed to parse proxy URL %q: %w", proxyURL, err)
}
// Use proxy URL with original path
targetURL = fmt.Sprintf("%s://%s%s", proxyURLParsed.Scheme, proxyURLParsed.Host, originalURL.Path)
// Set X-Host header to original hostname (without port) for Gateway API routing
if headers == nil {
headers = make(map[string]string)
}
headers["X-Host"] = originalURL.Hostname()
}

params := &adk.SseConnectionParams{
Url: tool.URL,
Url: targetURL,
Headers: headers,
}
if tool.Timeout != nil {
Expand All @@ -962,7 +1029,7 @@ func (a *adkApiTranslator) translateSseHttpTool(ctx context.Context, tool *v1alp
return params, nil
}

func (a *adkApiTranslator) translateMCPServerTarget(ctx context.Context, agent *adk.AgentConfig, agentNamespace string, toolServer *v1alpha2.McpServerTool, toolHeaders []v1alpha2.ValueRef) error {
func (a *adkApiTranslator) translateMCPServerTarget(ctx context.Context, agent *adk.AgentConfig, agentNamespace string, toolServer *v1alpha2.McpServerTool, toolHeaders []v1alpha2.ValueRef, proxyURL string) error {
gvk := toolServer.GroupKind()

switch gvk {
Expand Down Expand Up @@ -993,7 +1060,7 @@ func (a *adkApiTranslator) translateMCPServerTarget(ctx context.Context, agent *

spec.HeadersFrom = append(spec.HeadersFrom, toolHeaders...)

return a.translateRemoteMCPServerTarget(ctx, agent, agentNamespace, spec, toolServer.ToolNames)
return a.translateRemoteMCPServerTarget(ctx, agent, agentNamespace, spec, toolServer.ToolNames, proxyURL)
case schema.GroupKind{
Group: "",
Kind: "RemoteMCPServer",
Expand All @@ -1011,7 +1078,13 @@ func (a *adkApiTranslator) translateMCPServerTarget(ctx context.Context, agent *

remoteMcpServer.Spec.HeadersFrom = append(remoteMcpServer.Spec.HeadersFrom, toolHeaders...)

return a.translateRemoteMCPServerTarget(ctx, agent, agentNamespace, &remoteMcpServer.Spec, toolServer.ToolNames)
// RemoteMCPServer uses user-supplied URLs, but if the URL points to an internal k8s service,
// apply proxy to route through the gateway
proxyURL := ""
if a.globalProxyURL != "" && a.isInternalK8sURL(ctx, remoteMcpServer.Spec.URL, agentNamespace) {
proxyURL = a.globalProxyURL
}
return a.translateRemoteMCPServerTarget(ctx, agent, agentNamespace, &remoteMcpServer.Spec, toolServer.ToolNames, proxyURL)
case schema.GroupKind{
Group: "",
Kind: "Service",
Expand All @@ -1034,7 +1107,7 @@ func (a *adkApiTranslator) translateMCPServerTarget(ctx context.Context, agent *

spec.HeadersFrom = append(spec.HeadersFrom, toolHeaders...)

return a.translateRemoteMCPServerTarget(ctx, agent, agentNamespace, spec, toolServer.ToolNames)
return a.translateRemoteMCPServerTarget(ctx, agent, agentNamespace, spec, toolServer.ToolNames, proxyURL)

default:
return fmt.Errorf("unknown tool server type: %s", gvk)
Expand Down Expand Up @@ -1099,10 +1172,10 @@ func ConvertMCPServerToRemoteMCPServer(mcpServer *v1alpha1.MCPServer) (*v1alpha2
}, nil
}

func (a *adkApiTranslator) translateRemoteMCPServerTarget(ctx context.Context, agent *adk.AgentConfig, agentNamespace string, remoteMcpServer *v1alpha2.RemoteMCPServerSpec, toolNames []string) error {
func (a *adkApiTranslator) translateRemoteMCPServerTarget(ctx context.Context, agent *adk.AgentConfig, agentNamespace string, remoteMcpServer *v1alpha2.RemoteMCPServerSpec, toolNames []string, proxyURL string) error {
switch remoteMcpServer.Protocol {
case v1alpha2.RemoteMCPServerProtocolSse:
tool, err := a.translateSseHttpTool(ctx, remoteMcpServer, agentNamespace)
tool, err := a.translateSseHttpTool(ctx, remoteMcpServer, agentNamespace, proxyURL)
if err != nil {
return err
}
Expand All @@ -1111,7 +1184,7 @@ func (a *adkApiTranslator) translateRemoteMCPServerTarget(ctx context.Context, a
Tools: toolNames,
})
default:
tool, err := a.translateStreamableHttpTool(ctx, remoteMcpServer, agentNamespace)
tool, err := a.translateStreamableHttpTool(ctx, remoteMcpServer, agentNamespace, proxyURL)
if err != nil {
return err
}
Expand All @@ -1125,6 +1198,45 @@ func (a *adkApiTranslator) translateRemoteMCPServerTarget(ctx context.Context, a

// Helper functions

// isInternalK8sURL checks if a URL points to an internal Kubernetes service.
// Internal k8s URLs follow the pattern: http://{name}.{namespace}:{port} or
// http://{name}.{namespace}.svc.cluster.local:{port}
// This method checks if the namespace exists in the cluster to determine if it's internal.
func (a *adkApiTranslator) isInternalK8sURL(ctx context.Context, urlStr, namespace string) bool {
parsedURL, err := url.Parse(urlStr)
if err != nil {
return false
}

hostname := parsedURL.Hostname()
if hostname == "" {
return false
}

// Check if it ends with .svc.cluster.local (definitely internal)
if strings.HasSuffix(hostname, ".svc.cluster.local") {
return true
}

// Extract namespace from hostname pattern: {name}.{namespace}
// Examples: test-mcp-server.kagent -> namespace is "kagent"
parts := strings.Split(hostname, ".")
if len(parts) == 2 {
potentialNamespace := parts[1]

// Check if this namespace exists in the cluster
ns := &corev1.Namespace{}
err := a.kube.Get(ctx, types.NamespacedName{Name: potentialNamespace}, ns)
if err == nil {
// Namespace exists, so this is an internal k8s URL
return true
}
// If namespace doesn't exist, it's likely a TLD or external domain
}

return false
}

func computeConfigHash(agentCfg, agentCard, secretData []byte) uint64 {
hasher := sha256.New()
hasher.Write(agentCfg)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import (

"github.com/kagent-dev/kagent/go/api/v1alpha2"
translator "github.com/kagent-dev/kagent/go/internal/controller/translator/agent"
"github.com/kagent-dev/kmcp/api/v1alpha1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -28,6 +31,7 @@ type TestInput struct {
Operation string `yaml:"operation"` // "translateAgent", "translateTeam", "translateToolServer"
TargetObject string `yaml:"targetObject"` // name of the object to translate
Namespace string `yaml:"namespace"`
ProxyURL string `yaml:"proxyURL,omitempty"` // Optional proxy URL for internally-built k8s URLs
}

// TestGoldenAdkTranslator runs golden tests for the ADK API translator
Expand Down Expand Up @@ -74,20 +78,61 @@ func runGoldenTest(t *testing.T, inputFile, outputsDir, testName string, updateG
scheme := schemev1.Scheme
err = v1alpha2.AddToScheme(scheme)
require.NoError(t, err)
err = v1alpha1.AddToScheme(scheme)
require.NoError(t, err)

// Convert map objects to unstructured and then to typed objects
clientBuilder := fake.NewClientBuilder().WithScheme(scheme)

// Track namespaces we've seen to add them to the fake client
namespacesSeen := make(map[string]bool)

for _, objMap := range testInput.Objects {
// Convert map to unstructured
unstrObj := &unstructured.Unstructured{Object: objMap}

// Track namespace if present
if metadata, ok := objMap["metadata"].(map[string]any); ok {
if ns, ok := metadata["namespace"].(string); ok && ns != "" {
namespacesSeen[ns] = true
}
}

// Extract namespace from URLs in RemoteMCPServer specs
if kind, ok := objMap["kind"].(string); ok && kind == "RemoteMCPServer" {
if spec, ok := objMap["spec"].(map[string]any); ok {
if url, ok := spec["url"].(string); ok {
// Parse URL to extract namespace (e.g., http://service.namespace:port/path)
parts := strings.Split(url, "://")
if len(parts) == 2 {
hostPart := strings.Split(parts[1], "/")[0]
hostParts := strings.Split(hostPart, ":")
hostname := hostParts[0]
hostnameParts := strings.Split(hostname, ".")
if len(hostnameParts) == 2 {
namespacesSeen[hostnameParts[1]] = true
}
}
}
}
}

// Convert to typed object
typedObj, err := convertUnstructuredToTyped(unstrObj, scheme)
require.NoError(t, err)
clientBuilder = clientBuilder.WithObjects(typedObj)
}

// Add namespaces to fake client so namespace existence checks work
for nsName := range namespacesSeen {
ns := &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: nsName,
},
}
clientBuilder = clientBuilder.WithObjects(ns)
}

kubeClient := clientBuilder.Build()

// Create translator with a default model config that points to the first ModelConfig in the objects
Expand Down Expand Up @@ -119,7 +164,9 @@ func runGoldenTest(t *testing.T, inputFile, outputsDir, testName string, updateG
}, agent)
require.NoError(t, err)

result, err = translator.NewAdkApiTranslator(kubeClient, defaultModel, nil).TranslateAgent(ctx, agent)
// Use proxy URL from test input if provided
proxyURL := testInput.ProxyURL
result, err = translator.NewAdkApiTranslator(kubeClient, defaultModel, nil, proxyURL).TranslateAgent(ctx, agent)
require.NoError(t, err)

default:
Expand Down
Loading