diff --git a/tools/server/public/index.html.gz b/tools/server/public/index.html.gz index 2db04e95227..c78bc895e20 100644 Binary files a/tools/server/public/index.html.gz and b/tools/server/public/index.html.gz differ diff --git a/tools/server/webui/src/lib/agentic/openai-sse-client.ts b/tools/server/webui/src/lib/agentic/openai-sse-client.ts new file mode 100644 index 00000000000..8f9c2b8ac59 --- /dev/null +++ b/tools/server/webui/src/lib/agentic/openai-sse-client.ts @@ -0,0 +1,190 @@ +import type { + ApiChatCompletionToolCall, + ApiChatCompletionToolCallDelta, + ApiChatCompletionStreamChunk +} from '$lib/types/api'; +import type { ChatMessagePromptProgress, ChatMessageTimings } from '$lib/types/chat'; +import { mergeToolCallDeltas, extractModelName } from '$lib/utils/chat-stream'; +import type { AgenticChatCompletionRequest } from './types'; + +export type OpenAISseCallbacks = { + onChunk?: (chunk: string) => void; + onReasoningChunk?: (chunk: string) => void; + onToolCallChunk?: (serializedToolCalls: string) => void; + onModel?: (model: string) => void; + onFirstValidChunk?: () => void; + onProcessingUpdate?: (timings?: ChatMessageTimings, progress?: ChatMessagePromptProgress) => void; +}; + +export type OpenAISseTurnResult = { + content: string; + reasoningContent?: string; + toolCalls: ApiChatCompletionToolCall[]; + finishReason?: string | null; + timings?: ChatMessageTimings; +}; + +export type OpenAISseClientOptions = { + url: string; + buildHeaders?: () => Record; +}; + +export class OpenAISseClient { + constructor(private readonly options: OpenAISseClientOptions) {} + + async stream( + request: AgenticChatCompletionRequest, + callbacks: OpenAISseCallbacks = {}, + abortSignal?: AbortSignal + ): Promise { + const response = await fetch(this.options.url, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + ...(this.options.buildHeaders?.() ?? {}) + }, + body: JSON.stringify(request), + signal: abortSignal + }); + + if (!response.ok) { + const errorText = await response.text(); + throw new Error(errorText || `LLM request failed (${response.status})`); + } + + const reader = response.body?.getReader(); + if (!reader) { + throw new Error('LLM response stream is not available'); + } + + return this.consumeStream(reader, callbacks, abortSignal); + } + + private async consumeStream( + reader: ReadableStreamDefaultReader, + callbacks: OpenAISseCallbacks, + abortSignal?: AbortSignal + ): Promise { + const decoder = new TextDecoder(); + let buffer = ''; + let aggregatedContent = ''; + let aggregatedReasoning = ''; + let aggregatedToolCalls: ApiChatCompletionToolCall[] = []; + let hasOpenToolCallBatch = false; + let toolCallIndexOffset = 0; + let finishReason: string | null | undefined; + let lastTimings: ChatMessageTimings | undefined; + let modelEmitted = false; + let firstValidChunkEmitted = false; + + const finalizeToolCallBatch = () => { + if (!hasOpenToolCallBatch) return; + toolCallIndexOffset = aggregatedToolCalls.length; + hasOpenToolCallBatch = false; + }; + + const processToolCalls = (toolCalls?: ApiChatCompletionToolCallDelta[]) => { + if (!toolCalls || toolCalls.length === 0) { + return; + } + aggregatedToolCalls = mergeToolCallDeltas( + aggregatedToolCalls, + toolCalls, + toolCallIndexOffset + ); + if (aggregatedToolCalls.length === 0) { + return; + } + hasOpenToolCallBatch = true; + }; + + try { + while (true) { + if (abortSignal?.aborted) { + throw new DOMException('Aborted', 'AbortError'); + } + + const { done, value } = await reader.read(); + if (done) break; + + buffer += decoder.decode(value, { stream: true }); + const lines = buffer.split('\n'); + buffer = lines.pop() ?? ''; + + for (const line of lines) { + if (!line.startsWith('data: ')) { + continue; + } + + const payload = line.slice(6); + if (payload === '[DONE]' || payload.trim().length === 0) { + continue; + } + + let chunk: ApiChatCompletionStreamChunk; + try { + chunk = JSON.parse(payload) as ApiChatCompletionStreamChunk; + } catch (error) { + console.error('[Agentic][SSE] Failed to parse chunk:', error); + continue; + } + + if (!firstValidChunkEmitted && chunk.object === 'chat.completion.chunk') { + firstValidChunkEmitted = true; + callbacks.onFirstValidChunk?.(); + } + + const choice = chunk.choices?.[0]; + const delta = choice?.delta; + finishReason = choice?.finish_reason ?? finishReason; + + if (!modelEmitted) { + const chunkModel = extractModelName(chunk); + if (chunkModel) { + modelEmitted = true; + callbacks.onModel?.(chunkModel); + } + } + + if (chunk.timings || chunk.prompt_progress) { + callbacks.onProcessingUpdate?.(chunk.timings, chunk.prompt_progress); + if (chunk.timings) { + lastTimings = chunk.timings; + } + } + + if (delta?.content) { + finalizeToolCallBatch(); + aggregatedContent += delta.content; + callbacks.onChunk?.(delta.content); + } + + if (delta?.reasoning_content) { + finalizeToolCallBatch(); + aggregatedReasoning += delta.reasoning_content; + callbacks.onReasoningChunk?.(delta.reasoning_content); + } + + processToolCalls(delta?.tool_calls); + } + } + + finalizeToolCallBatch(); + } catch (error) { + if ((error as Error).name === 'AbortError') { + throw error; + } + throw error instanceof Error ? error : new Error('LLM stream error'); + } finally { + reader.releaseLock(); + } + + return { + content: aggregatedContent, + reasoningContent: aggregatedReasoning || undefined, + toolCalls: aggregatedToolCalls, + finishReason, + timings: lastTimings + }; + } +} diff --git a/tools/server/webui/src/lib/agentic/orchestrator.ts b/tools/server/webui/src/lib/agentic/orchestrator.ts new file mode 100644 index 00000000000..67f2b7cb452 --- /dev/null +++ b/tools/server/webui/src/lib/agentic/orchestrator.ts @@ -0,0 +1,255 @@ +import type { + ApiChatCompletionRequest, + ApiChatMessageData, + ApiChatCompletionToolCall +} from '$lib/types/api'; +import type { ChatMessagePromptProgress, ChatMessageTimings } from '$lib/types/chat'; +import type { MCPToolCall } from '$lib/mcp'; +import { MCPClient } from '$lib/mcp'; +import { OpenAISseClient, type OpenAISseTurnResult } from './openai-sse-client'; +import type { AgenticChatCompletionRequest, AgenticMessage, AgenticToolCallList } from './types'; +import { toAgenticMessages } from './types'; + +export type AgenticOrchestratorCallbacks = { + onChunk?: (chunk: string) => void; + onReasoningChunk?: (chunk: string) => void; + onToolCallChunk?: (serializedToolCalls: string) => void; + onModel?: (model: string) => void; + onFirstValidChunk?: () => void; + onComplete?: () => void; + onError?: (error: Error) => void; +}; + +export type AgenticRunParams = { + initialMessages: ApiChatMessageData[]; + requestTemplate: ApiChatCompletionRequest; + callbacks: AgenticOrchestratorCallbacks; + abortSignal?: AbortSignal; + onProcessingUpdate?: (timings?: ChatMessageTimings, progress?: ChatMessagePromptProgress) => void; + maxTurns?: number; + filterReasoningAfterFirstTurn?: boolean; +}; + +export type AgenticOrchestratorOptions = { + mcpClient: MCPClient; + llmClient: OpenAISseClient; + maxTurns: number; + maxToolPreviewLines: number; +}; + +export class AgenticOrchestrator { + private readonly mcpClient: MCPClient; + private readonly llmClient: OpenAISseClient; + private readonly maxTurns: number; + private readonly maxToolPreviewLines: number; + + constructor(options: AgenticOrchestratorOptions) { + this.mcpClient = options.mcpClient; + this.llmClient = options.llmClient; + this.maxTurns = options.maxTurns; + this.maxToolPreviewLines = options.maxToolPreviewLines; + } + + async run(params: AgenticRunParams): Promise { + const baseMessages = toAgenticMessages(params.initialMessages); + const sessionMessages: AgenticMessage[] = [...baseMessages]; + const tools = await this.mcpClient.getToolsDefinition(); + + const requestWithoutMessages = { ...params.requestTemplate }; + delete (requestWithoutMessages as Partial).messages; + const requestBase: AgenticChatCompletionRequest = { + ...(requestWithoutMessages as Omit), + stream: true, + messages: [] + }; + + const maxTurns = params.maxTurns ?? this.maxTurns; + + // Accumulate tool_calls across all turns (not per-turn) + const allToolCalls: ApiChatCompletionToolCall[] = []; + + for (let turn = 0; turn < maxTurns; turn++) { + if (params.abortSignal?.aborted) { + params.callbacks.onComplete?.(); + return; + } + + const llmRequest: AgenticChatCompletionRequest = { + ...requestBase, + messages: sessionMessages, + tools: tools.length > 0 ? tools : undefined + }; + + const shouldFilterReasoningChunks = params.filterReasoningAfterFirstTurn === true && turn > 0; + + let turnResult: OpenAISseTurnResult; + try { + turnResult = await this.llmClient.stream( + llmRequest, + { + onChunk: params.callbacks.onChunk, + onReasoningChunk: shouldFilterReasoningChunks + ? undefined + : params.callbacks.onReasoningChunk, + onModel: params.callbacks.onModel, + onFirstValidChunk: params.callbacks.onFirstValidChunk, + onProcessingUpdate: (timings, progress) => + params.onProcessingUpdate?.(timings, progress) + }, + params.abortSignal + ); + } catch (error) { + // Check if error is due to abort signal (stop button) + if (params.abortSignal?.aborted) { + params.callbacks.onComplete?.(); + return; + } + + const normalizedError = error instanceof Error ? error : new Error('LLM stream error'); + params.callbacks.onError?.(normalizedError); + const errorChunk = `\n\n\`\`\`\nUpstream LLM error:\n${normalizedError.message}\n\`\`\`\n`; + params.callbacks.onChunk?.(errorChunk); + params.callbacks.onComplete?.(); + return; + } + + if ( + turnResult.toolCalls.length === 0 || + (turnResult.finishReason && turnResult.finishReason !== 'tool_calls') + ) { + params.callbacks.onComplete?.(); + return; + } + + const normalizedCalls = this.normalizeToolCalls(turnResult.toolCalls); + if (normalizedCalls.length === 0) { + params.callbacks.onComplete?.(); + return; + } + + // Accumulate tool_calls from this turn + for (const call of normalizedCalls) { + allToolCalls.push({ + id: call.id, + type: call.type, + function: call.function ? { ...call.function } : undefined + }); + } + + // Forward the complete accumulated list + params.callbacks.onToolCallChunk?.(JSON.stringify(allToolCalls)); + + sessionMessages.push({ + role: 'assistant', + content: turnResult.content || undefined, + tool_calls: normalizedCalls + }); + + for (const toolCall of normalizedCalls) { + if (params.abortSignal?.aborted) { + params.callbacks.onComplete?.(); + return; + } + + const result = await this.executeTool(toolCall, params.abortSignal).catch( + (error: Error) => { + // Don't show error for AbortError + if (error.name !== 'AbortError') { + params.callbacks.onError?.(error); + } + return `Error: ${error.message}`; + } + ); + + // Stop silently if aborted during tool execution + if (params.abortSignal?.aborted) { + params.callbacks.onComplete?.(); + return; + } + + this.emitToolPreview(result, params.callbacks.onChunk); + + const contextValue = this.sanitizeToolContent(result); + sessionMessages.push({ + role: 'tool', + tool_call_id: toolCall.id, + content: contextValue + }); + } + } + + params.callbacks.onChunk?.('\n\n```\nTurn limit reached\n```\n'); + params.callbacks.onComplete?.(); + } + + private normalizeToolCalls(toolCalls: ApiChatCompletionToolCall[]): AgenticToolCallList { + if (!toolCalls) { + return []; + } + + return toolCalls.map((call, index) => ({ + id: call?.id ?? `tool_${index}`, + type: (call?.type as 'function') ?? 'function', + function: { + name: call?.function?.name ?? '', + arguments: call?.function?.arguments ?? '' + } + })); + } + + private async executeTool( + toolCall: AgenticToolCallList[number], + abortSignal?: AbortSignal + ): Promise { + const mcpCall: MCPToolCall = { + id: toolCall.id, + function: { + name: toolCall.function.name, + arguments: toolCall.function.arguments + } + }; + + const result = await this.mcpClient.execute(mcpCall, abortSignal); + return result; + } + + private emitToolPreview(result: string, emit?: (chunk: string) => void): void { + if (!emit) return; + const preview = this.createPreview(result); + emit(preview); + } + + private createPreview(result: string): string { + if (this.isBase64Image(result)) { + return `\n![tool-image](${result.trim()})\n`; + } + + const lines = result.split('\n'); + const trimmedLines = + lines.length > this.maxToolPreviewLines ? lines.slice(-this.maxToolPreviewLines) : lines; + const preview = trimmedLines.join('\n'); + return `\n\`\`\`\n${preview}\n\`\`\`\n`; + } + + private sanitizeToolContent(result: string): string { + if (this.isBase64Image(result)) { + return '[Image displayed to user]'; + } + return result; + } + + private isBase64Image(content: string): boolean { + const trimmed = content.trim(); + if (!trimmed.startsWith('data:image/')) { + return false; + } + + const match = trimmed.match(/^data:image\/(png|jpe?g|gif|webp);base64,([A-Za-z0-9+/]+=*)$/); + if (!match) { + return false; + } + + const base64Payload = match[2]; + return base64Payload.length > 0 && base64Payload.length % 4 === 0; + } +} diff --git a/tools/server/webui/src/lib/agentic/types.ts b/tools/server/webui/src/lib/agentic/types.ts new file mode 100644 index 00000000000..c3b61195f4d --- /dev/null +++ b/tools/server/webui/src/lib/agentic/types.ts @@ -0,0 +1,71 @@ +import type { + ApiChatCompletionRequest, + ApiChatMessageContentPart, + ApiChatMessageData +} from '$lib/types/api'; + +export type AgenticToolCallPayload = { + id: string; + type: 'function'; + function: { + name: string; + arguments: string; + }; +}; + +export type AgenticMessage = + | { + role: 'system' | 'user'; + content: string | ApiChatMessageContentPart[]; + } + | { + role: 'assistant'; + content?: string | ApiChatMessageContentPart[]; + tool_calls?: AgenticToolCallPayload[]; + } + | { + role: 'tool'; + tool_call_id: string; + content: string; + }; + +export type AgenticAssistantMessage = Extract; +export type AgenticToolCallList = NonNullable; + +export type AgenticChatCompletionRequest = Omit & { + messages: AgenticMessage[]; + stream: true; + tools?: ApiChatCompletionRequest['tools']; +}; + +export function toAgenticMessages(messages: ApiChatMessageData[]): AgenticMessage[] { + return messages.map((message) => { + if (message.role === 'assistant' && message.tool_calls && message.tool_calls.length > 0) { + return { + role: 'assistant', + content: message.content, + tool_calls: message.tool_calls.map((call, index) => ({ + id: call.id ?? `call_${index}`, + type: (call.type as 'function') ?? 'function', + function: { + name: call.function?.name ?? '', + arguments: call.function?.arguments ?? '' + } + })) + } satisfies AgenticMessage; + } + + if (message.role === 'tool' && message.tool_call_id) { + return { + role: 'tool', + tool_call_id: message.tool_call_id, + content: typeof message.content === 'string' ? message.content : '' + } satisfies AgenticMessage; + } + + return { + role: message.role, + content: message.content + } satisfies AgenticMessage; + }); +} diff --git a/tools/server/webui/src/lib/components/app/chat/ChatSettings/ChatSettings.svelte b/tools/server/webui/src/lib/components/app/chat/ChatSettings/ChatSettings.svelte index 45640e42a01..cb919c83483 100644 --- a/tools/server/webui/src/lib/components/app/chat/ChatSettings/ChatSettings.svelte +++ b/tools/server/webui/src/lib/components/app/chat/ChatSettings/ChatSettings.svelte @@ -9,12 +9,14 @@ Moon, ChevronLeft, ChevronRight, - Database + Database, + Cable } from '@lucide/svelte'; import { ChatSettingsFooter, ChatSettingsImportExportTab, - ChatSettingsFields + ChatSettingsFields, + McpSettingsSection } from '$lib/components/app'; import { ScrollArea } from '$lib/components/ui/scroll-area'; import { config, settingsStore } from '$lib/stores/settings.svelte'; @@ -219,6 +221,27 @@ } ] }, + { + title: 'MCP Client', + icon: Cable, + fields: [ + { + key: 'agenticMaxTurns', + label: 'Agentic loop max turns', + type: 'input' + }, + { + key: 'agenticMaxToolPreviewLines', + label: 'Max lines per tool preview', + type: 'input' + }, + { + key: 'agenticFilterReasoningAfterFirstTurn', + label: 'Filter reasoning after first turn', + type: 'checkbox' + } + ] + }, { title: 'Import/Export', icon: Database, @@ -318,7 +341,9 @@ 'dry_multiplier', 'dry_base', 'dry_allowed_length', - 'dry_penalty_last_n' + 'dry_penalty_last_n', + 'agenticMaxTurns', + 'agenticMaxToolPreviewLines' ]; for (const field of numericFields) { @@ -466,6 +491,16 @@ {#if currentSection.title === 'Import/Export'} + {:else if currentSection.title === 'MCP Client'} +
+ + +
{:else}
+ import { Loader2, Plus, Trash2 } from '@lucide/svelte'; + import { Checkbox } from '$lib/components/ui/checkbox'; + import { Input } from '$lib/components/ui/input'; + import Label from '$lib/components/ui/label/label.svelte'; + import { Button } from '$lib/components/ui/button'; + import { + detectMcpTransportFromUrl, + parseMcpServerSettings, + getDefaultMcpConfig, + type MCPServerSettingsEntry + } from '$lib/config/mcp'; + import { MCPClient } from '$lib/mcp'; + import type { SettingsConfigType } from '$lib/types/settings'; + + interface Props { + localConfig: SettingsConfigType; + onConfigChange: (key: string, value: string | boolean) => void; + } + + let { localConfig, onConfigChange }: Props = $props(); + + const defaultMcpConfig = getDefaultMcpConfig(); + + type HealthCheckState = + | { status: 'idle' } + | { status: 'loading' } + | { status: 'error'; message: string } + | { status: 'success'; tools: { name: string; description?: string }[] }; + + let healthChecks: Record = $state({}); + + function serializeServers(servers: MCPServerSettingsEntry[]) { + onConfigChange('mcpServers', JSON.stringify(servers)); + } + + function getServers(): MCPServerSettingsEntry[] { + return parseMcpServerSettings(localConfig.mcpServers); + } + + function addServer() { + const servers = getServers(); + const newServer: MCPServerSettingsEntry = { + id: crypto.randomUUID ? crypto.randomUUID() : `server-${Date.now()}`, + enabled: true, + url: '', + requestTimeoutSeconds: defaultMcpConfig.requestTimeoutSeconds + }; + + serializeServers([...servers, newServer]); + } + + function updateServer(id: string, updates: Partial) { + const servers = getServers(); + const nextServers = servers.map((server) => + server.id === id + ? { + ...server, + ...updates + } + : server + ); + + serializeServers(nextServers); + } + + function removeServer(id: string) { + const servers = getServers().filter((server) => server.id !== id); + serializeServers(servers); + } + + function getHealthState(id: string): HealthCheckState { + return healthChecks[id] ?? { status: 'idle' }; + } + + function isErrorState(state: HealthCheckState): state is { status: 'error'; message: string } { + return state.status === 'error'; + } + + function isSuccessState( + state: HealthCheckState + ): state is { status: 'success'; tools: { name: string; description?: string }[] } { + return state.status === 'success'; + } + + function setHealthState(id: string, state: HealthCheckState) { + healthChecks = { ...healthChecks, [id]: state }; + } + + async function runHealthCheck(server: MCPServerSettingsEntry) { + const trimmedUrl = server.url.trim(); + + if (!trimmedUrl) { + setHealthState(server.id, { + status: 'error', + message: 'Please enter a server URL before running a health check.' + }); + return; + } + + setHealthState(server.id, { status: 'loading' }); + + const timeoutMs = Math.round(server.requestTimeoutSeconds * 1000); + + const mcpClient = new MCPClient({ + protocolVersion: defaultMcpConfig.protocolVersion, + capabilities: defaultMcpConfig.capabilities, + clientInfo: defaultMcpConfig.clientInfo, + requestTimeoutMs: timeoutMs, + servers: { + [server.id]: { + url: trimmedUrl, + transport: detectMcpTransportFromUrl(trimmedUrl), + handshakeTimeoutMs: defaultMcpConfig.connectionTimeoutMs, + requestTimeoutMs: timeoutMs + } + } + }); + + try { + await mcpClient.initialize(); + const tools = (await mcpClient.getToolsDefinition()).map((tool) => ({ + name: tool.function.name, + description: tool.function.description + })); + + setHealthState(server.id, { status: 'success', tools }); + } catch (error) { + const message = error instanceof Error ? error.message : 'Unknown error occurred'; + setHealthState(server.id, { status: 'error', message }); + } finally { + try { + await mcpClient.shutdown(); + } catch (shutdownError) { + console.warn('[MCP] Failed to cleanly shutdown client', shutdownError); + } + } + } + + +
+
+
+

MCP Servers

+

+ Configure one or more MCP Servers. Only enabled servers with a URL are used. +

+
+ + +
+ + {#if getServers().length === 0} +
+ No MCP Servers configured yet. Add one to enable agentic features. +
+ {/if} + +
+ {#each getServers() as server, index (server.id)} + {@const healthState = getHealthState(server.id)} + +
+
+
+ + updateServer(server.id, { + enabled: Boolean(checked) + })} + /> +
+ +

+ {detectMcpTransportFromUrl(server.url) === 'websocket' + ? 'WebSocket' + : 'Streamable HTTP'} +

+
+
+ +
+ +
+
+ +
+
+ + + updateServer(server.id, { + url: event.currentTarget.value + })} + /> +
+ +
+ +
+ { + const parsed = Number(event.currentTarget.value); + updateServer(server.id, { + requestTimeoutSeconds: + Number.isFinite(parsed) && parsed > 0 + ? parsed + : defaultMcpConfig.requestTimeoutSeconds + }); + }} + /> + + +
+
+
+ + {#if healthState.status !== 'idle'} +
+ {#if healthState.status === 'loading'} +
+ + Running health check... +
+ {:else if isErrorState(healthState)} +

+ Health check failed: {healthState.message} +

+ {:else if isSuccessState(healthState)} + {#if healthState.tools.length === 0} +

No tools returned by this server.

+ {:else} +
+

+ Available tools ({healthState.tools.length}) +

+
    + {#each healthState.tools as tool (tool.name)} +
  • + + {tool.name} + + {tool.description ?? 'No description provided.'} +
  • + {/each} +
+
+ {/if} + {/if} +
+ {/if} +
+ {/each} +
+
diff --git a/tools/server/webui/src/lib/components/app/index.ts b/tools/server/webui/src/lib/components/app/index.ts index 87b24598b72..df51ad18ad2 100644 --- a/tools/server/webui/src/lib/components/app/index.ts +++ b/tools/server/webui/src/lib/components/app/index.ts @@ -33,6 +33,7 @@ export { default as ChatSettingsFooter } from './chat/ChatSettings/ChatSettingsF export { default as ChatSettingsFields } from './chat/ChatSettings/ChatSettingsFields.svelte'; export { default as ChatSettingsImportExportTab } from './chat/ChatSettings/ChatSettingsImportExportTab.svelte'; export { default as ChatSettingsParameterSourceIndicator } from './chat/ChatSettings/ChatSettingsParameterSourceIndicator.svelte'; +export { default as McpSettingsSection } from './chat/ChatSettings/McpSettingsSection.svelte'; export { default as ChatSidebar } from './chat/ChatSidebar/ChatSidebar.svelte'; export { default as ChatSidebarConversationItem } from './chat/ChatSidebar/ChatSidebarConversationItem.svelte'; diff --git a/tools/server/webui/src/lib/config/agentic.ts b/tools/server/webui/src/lib/config/agentic.ts new file mode 100644 index 00000000000..61f3aa96210 --- /dev/null +++ b/tools/server/webui/src/lib/config/agentic.ts @@ -0,0 +1,51 @@ +import { hasEnabledMcpServers } from './mcp'; +import type { SettingsConfigType } from '$lib/types/settings'; + +/** + * Agentic orchestration configuration. + */ +export interface AgenticConfig { + enabled: boolean; + maxTurns: number; + maxToolPreviewLines: number; + filterReasoningAfterFirstTurn: boolean; +} + +const defaultAgenticConfig: AgenticConfig = { + enabled: true, + maxTurns: 100, + maxToolPreviewLines: 25, + filterReasoningAfterFirstTurn: true +}; + +function normalizeNumber(value: unknown, fallback: number): number { + const parsed = typeof value === 'string' ? Number.parseFloat(value) : Number(value); + if (!Number.isFinite(parsed) || parsed <= 0) { + return fallback; + } + + return parsed; +} + +/** + * Gets the current agentic configuration. + * Automatically disables agentic mode if no MCP servers are configured. + */ +export function getAgenticConfig(settings: SettingsConfigType): AgenticConfig { + const maxTurns = normalizeNumber(settings.agenticMaxTurns, defaultAgenticConfig.maxTurns); + const maxToolPreviewLines = normalizeNumber( + settings.agenticMaxToolPreviewLines, + defaultAgenticConfig.maxToolPreviewLines + ); + const filterReasoningAfterFirstTurn = + typeof settings.agenticFilterReasoningAfterFirstTurn === 'boolean' + ? settings.agenticFilterReasoningAfterFirstTurn + : defaultAgenticConfig.filterReasoningAfterFirstTurn; + + return { + enabled: hasEnabledMcpServers(settings) && defaultAgenticConfig.enabled, + maxTurns, + maxToolPreviewLines, + filterReasoningAfterFirstTurn + }; +} diff --git a/tools/server/webui/src/lib/config/mcp.ts b/tools/server/webui/src/lib/config/mcp.ts new file mode 100644 index 00000000000..01e4ede3c50 --- /dev/null +++ b/tools/server/webui/src/lib/config/mcp.ts @@ -0,0 +1,155 @@ +import type { + MCPClientCapabilities, + MCPClientConfig, + MCPClientInfo, + MCPServerConfig +} from '../mcp/types'; +import type { SettingsConfigType } from '$lib/types/settings'; + +/** + * Raw MCP server configuration entry stored in settings. + */ +export type MCPServerSettingsEntry = { + id: string; + enabled: boolean; + url: string; + requestTimeoutSeconds: number; +}; + +const defaultMcpConfig = { + protocolVersion: '2025-06-18', + capabilities: { tools: { listChanged: true } } as MCPClientCapabilities, + clientInfo: { name: 'llama-webui-mcp', version: 'dev' } as MCPClientInfo, + requestTimeoutSeconds: 300, // 5 minutes for long-running tools + connectionTimeoutMs: 10_000 // 10 seconds for connection establishment +}; + +export function getDefaultMcpConfig() { + return defaultMcpConfig; +} + +export function detectMcpTransportFromUrl(url: string): 'websocket' | 'streamable_http' { + const normalized = url.trim().toLowerCase(); + return normalized.startsWith('ws://') || normalized.startsWith('wss://') + ? 'websocket' + : 'streamable_http'; +} + +function normalizeRequestTimeoutSeconds(value: unknown, fallback: number): number { + const parsed = typeof value === 'string' ? Number.parseFloat(value) : Number(value); + if (!Number.isFinite(parsed) || parsed <= 0) { + return fallback; + } + + return parsed; +} + +function sanitizeId(id: unknown, index: number): string { + if (typeof id === 'string' && id.trim()) { + return id.trim(); + } + + return `server-${index + 1}`; +} + +function sanitizeUrl(url: unknown): string { + if (typeof url === 'string') { + return url.trim(); + } + + return ''; +} + +export function parseMcpServerSettings( + rawServers: unknown, + fallbackRequestTimeoutSeconds = defaultMcpConfig.requestTimeoutSeconds +): MCPServerSettingsEntry[] { + if (!rawServers) return []; + + let parsed: unknown; + if (typeof rawServers === 'string') { + const trimmed = rawServers.trim(); + if (!trimmed) return []; + + try { + parsed = JSON.parse(trimmed); + } catch (error) { + console.warn('[MCP] Failed to parse mcpServers JSON, ignoring value:', error); + return []; + } + } else { + parsed = rawServers; + } + + if (!Array.isArray(parsed)) return []; + + return parsed.map((entry, index) => { + const requestTimeoutSeconds = normalizeRequestTimeoutSeconds( + (entry as { requestTimeoutSeconds?: unknown })?.requestTimeoutSeconds, + fallbackRequestTimeoutSeconds + ); + + const url = sanitizeUrl((entry as { url?: unknown })?.url); + + return { + id: sanitizeId((entry as { id?: unknown })?.id, index), + enabled: Boolean((entry as { enabled?: unknown })?.enabled), + url, + requestTimeoutSeconds + } satisfies MCPServerSettingsEntry; + }); +} + +function buildServerConfig( + entry: MCPServerSettingsEntry, + connectionTimeoutMs = defaultMcpConfig.connectionTimeoutMs +): MCPServerConfig | undefined { + if (!entry?.url) { + return undefined; + } + + return { + url: entry.url, + transport: detectMcpTransportFromUrl(entry.url), + handshakeTimeoutMs: connectionTimeoutMs, + requestTimeoutMs: Math.round(entry.requestTimeoutSeconds * 1000) + }; +} + +/** + * Builds MCP client configuration from settings. + * Returns undefined if no valid servers are configured. + */ +export function buildMcpClientConfig(config: SettingsConfigType): MCPClientConfig | undefined { + const rawServers = parseMcpServerSettings(config.mcpServers); + + if (!rawServers.length) { + return undefined; + } + + const servers: Record = {}; + for (const [index, entry] of rawServers.entries()) { + if (!entry.enabled) continue; + + const normalized = buildServerConfig(entry); + if (normalized) { + servers[sanitizeId(entry.id, index)] = normalized; + } + } + + if (Object.keys(servers).length === 0) { + return undefined; + } + + return { + protocolVersion: defaultMcpConfig.protocolVersion, + capabilities: defaultMcpConfig.capabilities, + clientInfo: defaultMcpConfig.clientInfo, + requestTimeoutMs: Math.round(defaultMcpConfig.requestTimeoutSeconds * 1000), + servers + }; +} + +export function hasEnabledMcpServers(config: SettingsConfigType): boolean { + return Boolean(buildMcpClientConfig(config)); +} diff --git a/tools/server/webui/src/lib/constants/settings-config.ts b/tools/server/webui/src/lib/constants/settings-config.ts index 3764a2856b5..555adf47a18 100644 --- a/tools/server/webui/src/lib/constants/settings-config.ts +++ b/tools/server/webui/src/lib/constants/settings-config.ts @@ -16,6 +16,10 @@ export const SETTING_CONFIG_DEFAULT: Record = disableAutoScroll: false, renderUserContentAsMarkdown: false, autoMicOnEmpty: false, + mcpServers: '[]', + agenticMaxTurns: 10, + agenticMaxToolPreviewLines: 25, + agenticFilterReasoningAfterFirstTurn: true, // make sure these default values are in sync with `common.h` samplers: 'top_k;typ_p;top_p;min_p;temperature', temperature: 0.8, @@ -98,6 +102,14 @@ export const SETTING_CONFIG_INFO: Record = { renderUserContentAsMarkdown: 'Render user messages using markdown formatting in the chat.', autoMicOnEmpty: 'Automatically show microphone button instead of send button when textarea is empty for models with audio modality support.', + mcpServers: + 'Configure MCP servers as a JSON list. Use the form in the MCP Client settings section to edit.', + agenticMaxTurns: + 'Maximum number of tool execution cycles before stopping (prevents infinite loops).', + agenticMaxToolPreviewLines: + 'Number of lines shown in tool output previews (last N lines). Only these previews and the final LLM response persist after the agentic loop completes.', + agenticFilterReasoningAfterFirstTurn: + 'Only show reasoning from the first agentic turn. When disabled, reasoning from all turns is merged in one (WebUI limitation).', pyInterpreterEnabled: 'Enable Python interpreter using Pyodide. Allows running Python code in markdown code blocks.', enableContinueGeneration: diff --git a/tools/server/webui/src/lib/mcp/client.ts b/tools/server/webui/src/lib/mcp/client.ts new file mode 100644 index 00000000000..4f514b4107c --- /dev/null +++ b/tools/server/webui/src/lib/mcp/client.ts @@ -0,0 +1,413 @@ +import { getDefaultMcpConfig } from '$lib/config/mcp'; +import { JsonRpcProtocol } from './protocol'; +import type { + JsonRpcMessage, + MCPClientConfig, + MCPServerCapabilities, + MCPServerConfig, + MCPToolCall, + MCPToolDefinition, + MCPToolsCallResult +} from './types'; +import { MCPError } from './types'; +import type { MCPTransport } from './transports/types'; +import { WebSocketTransport } from './transports/websocket'; +import { StreamableHttpTransport } from './transports/streamable-http'; + +const MCP_DEFAULTS = getDefaultMcpConfig(); + +interface PendingRequest { + resolve: (value: Record) => void; + reject: (reason?: unknown) => void; + timeout: ReturnType; +} + +interface ServerState { + transport: MCPTransport; + pending: Map; + requestId: number; + tools: MCPToolDefinition[]; + requestTimeoutMs?: number; + capabilities?: MCPServerCapabilities; + protocolVersion?: string; +} + +export class MCPClient { + private readonly servers: Map = new Map(); + private readonly toolsToServer: Map = new Map(); + private readonly config: MCPClientConfig; + + constructor(config: MCPClientConfig) { + if (!config?.servers || Object.keys(config.servers).length === 0) { + throw new Error('MCPClient requires at least one server configuration'); + } + this.config = config; + } + + async initialize(): Promise { + const entries = Object.entries(this.config.servers); + await Promise.all( + entries.map(([name, serverConfig]) => this.initializeServer(name, serverConfig)) + ); + } + + listTools(): string[] { + return Array.from(this.toolsToServer.keys()); + } + + async getToolsDefinition(): Promise< + { + type: 'function'; + function: { name: string; description?: string; parameters: Record }; + }[] + > { + const tools: { + type: 'function'; + function: { name: string; description?: string; parameters: Record }; + }[] = []; + + for (const [, server] of this.servers) { + for (const tool of server.tools) { + tools.push({ + type: 'function', + function: { + name: tool.name, + description: tool.description, + parameters: tool.inputSchema ?? { + type: 'object', + properties: {}, + required: [] + } + } + }); + } + } + + return tools; + } + + async execute(toolCall: MCPToolCall, abortSignal?: AbortSignal): Promise { + const toolName = toolCall.function.name; + const serverName = this.toolsToServer.get(toolName); + if (!serverName) { + throw new MCPError(`Unknown tool: ${toolName}`, -32601); + } + + if (abortSignal?.aborted) { + throw new DOMException('Aborted', 'AbortError'); + } + + let args: Record; + const originalArgs = toolCall.function.arguments; + if (typeof originalArgs === 'string') { + const trimmed = originalArgs.trim(); + if (trimmed === '') { + args = {}; + } else { + try { + const parsed = JSON.parse(trimmed); + if (typeof parsed !== 'object' || parsed === null || Array.isArray(parsed)) { + throw new MCPError( + `Tool arguments must be an object, got ${Array.isArray(parsed) ? 'array' : typeof parsed}`, + -32602 + ); + } + args = parsed as Record; + } catch (error) { + if (error instanceof MCPError) { + throw error; + } + throw new MCPError( + `Failed to parse tool arguments as JSON: ${(error as Error).message}`, + -32700 + ); + } + } + } else if ( + typeof originalArgs === 'object' && + originalArgs !== null && + !Array.isArray(originalArgs) + ) { + args = originalArgs as Record; + } else { + throw new MCPError(`Invalid tool arguments type: ${typeof originalArgs}`, -32602); + } + + const response = await this.call( + serverName, + 'tools/call', + { + name: toolName, + arguments: args + }, + abortSignal + ); + + return MCPClient.formatToolResult(response as MCPToolsCallResult); + } + + async shutdown(): Promise { + for (const [, state] of this.servers) { + await state.transport.stop(); + } + this.servers.clear(); + this.toolsToServer.clear(); + } + + private async initializeServer(name: string, config: MCPServerConfig): Promise { + const protocolVersion = this.config.protocolVersion ?? MCP_DEFAULTS.protocolVersion; + const transport = this.createTransport(config, protocolVersion); + await transport.start(); + + const state: ServerState = { + transport, + pending: new Map(), + requestId: 0, + tools: [], + requestTimeoutMs: config.requestTimeoutMs + }; + + transport.onMessage((message) => this.handleMessage(name, message)); + this.servers.set(name, state); + + const clientInfo = this.config.clientInfo ?? MCP_DEFAULTS.clientInfo; + const capabilities = + config.capabilities ?? this.config.capabilities ?? MCP_DEFAULTS.capabilities; + + const initResult = await this.call(name, 'initialize', { + protocolVersion, + capabilities, + clientInfo + }); + + const negotiatedVersion = (initResult?.protocolVersion as string) ?? protocolVersion; + + state.capabilities = (initResult?.capabilities as MCPServerCapabilities) ?? {}; + state.protocolVersion = negotiatedVersion; + + const notification = JsonRpcProtocol.createNotification('notifications/initialized'); + await state.transport.send(notification as JsonRpcMessage); + + await this.refreshTools(name); + } + + private createTransport(config: MCPServerConfig, protocolVersion: string): MCPTransport { + if (!config.url) { + throw new Error('MCP server configuration is missing url'); + } + + const transportType = config.transport ?? 'websocket'; + + if (transportType === 'streamable_http') { + return new StreamableHttpTransport({ + url: config.url, + headers: config.headers, + credentials: config.credentials, + protocolVersion, + sessionId: config.sessionId + }); + } + + if (transportType !== 'websocket') { + throw new Error(`Unsupported transport "${transportType}" in webui environment`); + } + + return new WebSocketTransport({ + url: config.url, + protocols: config.protocols, + handshakeTimeoutMs: config.handshakeTimeoutMs + }); + } + + private async refreshTools(serverName: string): Promise { + const state = this.servers.get(serverName); + if (!state) return; + + const response = await this.call(serverName, 'tools/list'); + const tools = (response.tools as MCPToolDefinition[]) ?? []; + state.tools = tools; + + for (const [tool, owner] of Array.from(this.toolsToServer.entries())) { + if (owner === serverName && !tools.find((t) => t.name === tool)) { + this.toolsToServer.delete(tool); + } + } + + for (const tool of tools) { + this.toolsToServer.set(tool.name, serverName); + } + } + + private call( + serverName: string, + method: string, + params?: Record, + abortSignal?: AbortSignal + ): Promise> { + const state = this.servers.get(serverName); + if (!state) { + return Promise.reject(new MCPError(`Server ${serverName} is not connected`, -32000)); + } + + const id = ++state.requestId; + const message = JsonRpcProtocol.createRequest(id, method, params); + + const timeoutDuration = + state.requestTimeoutMs ?? + this.config.requestTimeoutMs ?? + MCP_DEFAULTS.requestTimeoutSeconds * 1000; + + if (abortSignal?.aborted) { + return Promise.reject(new DOMException('Aborted', 'AbortError')); + } + + return new Promise((resolve, reject) => { + const cleanupTasks: Array<() => void> = []; + const cleanup = () => { + for (const task of cleanupTasks.splice(0)) { + task(); + } + }; + + const timeout = setTimeout(() => { + cleanup(); + reject(new Error(`Timeout while waiting for ${method} response from ${serverName}`)); + }, timeoutDuration); + cleanupTasks.push(() => clearTimeout(timeout)); + cleanupTasks.push(() => state.pending.delete(id)); + + if (abortSignal) { + const abortHandler = () => { + cleanup(); + reject(new DOMException('Aborted', 'AbortError')); + }; + abortSignal.addEventListener('abort', abortHandler, { once: true }); + cleanupTasks.push(() => abortSignal.removeEventListener('abort', abortHandler)); + } + + state.pending.set(id, { + resolve: (value) => { + cleanup(); + resolve(value); + }, + reject: (reason) => { + cleanup(); + reject(reason); + }, + timeout + }); + + const handleSendError = (error: unknown) => { + cleanup(); + reject(error); + }; + + try { + void state.transport + .send(message as JsonRpcMessage) + .catch((error) => handleSendError(error)); + } catch (error) { + handleSendError(error); + } + }); + } + + private handleMessage(serverName: string, message: JsonRpcMessage): void { + const state = this.servers.get(serverName); + if (!state) { + return; + } + + if ('method' in message && !('id' in message)) { + this.handleNotification(serverName, message.method, message.params); + return; + } + + const response = JsonRpcProtocol.parseResponse(message); + if (!response) { + return; + } + + const pending = state.pending.get(response.id as number); + if (!pending) { + return; + } + + state.pending.delete(response.id as number); + clearTimeout(pending.timeout); + + if (response.error) { + pending.reject( + new MCPError(response.error.message, response.error.code, response.error.data) + ); + return; + } + + pending.resolve(response.result ?? {}); + } + + private handleNotification( + serverName: string, + method: string, + params?: Record + ): void { + if (method === 'notifications/tools/list_changed') { + void this.refreshTools(serverName).catch((error) => { + console.error(`[MCP] Failed to refresh tools for ${serverName}:`, error); + }); + } else if (method === 'notifications/logging/message' && params) { + console.debug(`[MCP][${serverName}]`, params); + } + } + + private static formatToolResult(result: MCPToolsCallResult): string { + const content = result.content; + if (Array.isArray(content)) { + return content + .map((item) => MCPClient.formatSingleContent(item)) + .filter(Boolean) + .join('\n'); + } + if (content) { + return MCPClient.formatSingleContent(content); + } + if (result.result !== undefined) { + return typeof result.result === 'string' ? result.result : JSON.stringify(result.result); + } + return ''; + } + + private static formatSingleContent(content: unknown): string { + if (content === null || content === undefined) { + return ''; + } + + if (typeof content === 'string') { + return content; + } + + if (typeof content === 'object') { + const typed = content as { + type?: string; + text?: string; + data?: string; + mimeType?: string; + resource?: unknown; + }; + if (typed.type === 'text' && typeof typed.text === 'string') { + return typed.text; + } + if (typed.type === 'image' && typeof typed.data === 'string' && typed.mimeType) { + return `data:${typed.mimeType};base64,${typed.data}`; + } + if (typed.type === 'resource' && typed.resource) { + return JSON.stringify(typed.resource); + } + if (typeof typed.text === 'string') { + return typed.text; + } + } + + return JSON.stringify(content); + } +} diff --git a/tools/server/webui/src/lib/mcp/index.ts b/tools/server/webui/src/lib/mcp/index.ts new file mode 100644 index 00000000000..14d11859c4d --- /dev/null +++ b/tools/server/webui/src/lib/mcp/index.ts @@ -0,0 +1,3 @@ +export { MCPClient } from './client'; +export { MCPError } from './types'; +export type { MCPClientConfig, MCPServerConfig, MCPToolCall } from './types'; diff --git a/tools/server/webui/src/lib/mcp/protocol.ts b/tools/server/webui/src/lib/mcp/protocol.ts new file mode 100644 index 00000000000..eb161041e5a --- /dev/null +++ b/tools/server/webui/src/lib/mcp/protocol.ts @@ -0,0 +1,46 @@ +import type { + JsonRpcId, + JsonRpcMessage, + JsonRpcNotification, + JsonRpcRequest, + JsonRpcResponse +} from './types'; + +export class JsonRpcProtocol { + static createRequest( + id: JsonRpcId, + method: string, + params?: Record + ): JsonRpcRequest { + return { + jsonrpc: '2.0', + id, + method, + ...(params ? { params } : {}) + }; + } + + static createNotification(method: string, params?: Record): JsonRpcNotification { + return { + jsonrpc: '2.0', + method, + ...(params ? { params } : {}) + }; + } + + static parseResponse(message: JsonRpcMessage): JsonRpcResponse | null { + if (!message || typeof message !== 'object') { + return null; + } + + if ((message as JsonRpcResponse).jsonrpc !== '2.0') { + return null; + } + + if (!('id' in message)) { + return null; + } + + return message as JsonRpcResponse; + } +} diff --git a/tools/server/webui/src/lib/mcp/transports/streamable-http.ts b/tools/server/webui/src/lib/mcp/transports/streamable-http.ts new file mode 100644 index 00000000000..dc9321c1529 --- /dev/null +++ b/tools/server/webui/src/lib/mcp/transports/streamable-http.ts @@ -0,0 +1,129 @@ +import type { JsonRpcMessage } from '$lib/mcp/types'; +import type { MCPTransport } from './types'; + +export type StreamableHttpTransportOptions = { + url: string; + headers?: Record; + credentials?: RequestCredentials; + protocolVersion?: string; + sessionId?: string; +}; + +export class StreamableHttpTransport implements MCPTransport { + private handler: ((message: JsonRpcMessage) => void) | null = null; + private activeSessionId: string | undefined; + + constructor(private readonly options: StreamableHttpTransportOptions) {} + + async start(): Promise { + this.activeSessionId = this.options.sessionId ?? undefined; + } + + async stop(): Promise {} + + async send(message: JsonRpcMessage): Promise { + return this.dispatch(message); + } + + onMessage(handler: (message: JsonRpcMessage) => void): void { + this.handler = handler; + } + + private async dispatch(message: JsonRpcMessage): Promise { + const headers: Record = { + 'Content-Type': 'application/json', + Accept: 'application/json, text/event-stream', + ...(this.options.headers ?? {}) + }; + + if (this.activeSessionId) { + headers['Mcp-Session-Id'] = this.activeSessionId; + } + + if (this.options.protocolVersion) { + headers['MCP-Protocol-Version'] = this.options.protocolVersion; + } + + const credentialsOption = + this.options.credentials ?? (this.activeSessionId ? 'include' : 'same-origin'); + const response = await fetch(this.options.url, { + method: 'POST', + headers, + body: JSON.stringify(message), + credentials: credentialsOption + }); + + const sessionHeader = response.headers.get('mcp-session-id'); + if (sessionHeader) { + this.activeSessionId = sessionHeader; + } + + if (!response.ok) { + const errorBody = await response.text().catch(() => ''); + throw new Error( + `Failed to send MCP request over Streamable HTTP (${response.status} ${response.statusText}): ${errorBody}` + ); + } + + const contentType = response.headers.get('content-type') ?? ''; + + if (contentType.includes('application/json')) { + const payload = (await response.json()) as JsonRpcMessage; + this.handler?.(payload); + return; + } + + if (contentType.includes('text/event-stream') && response.body) { + const reader = response.body.getReader(); + await this.consume(reader); + return; + } + + if (response.status >= 400) { + const bodyText = await response.text().catch(() => ''); + throw new Error( + `Unexpected MCP Streamable HTTP response (${response.status}): ${bodyText || 'no body'}` + ); + } + } + + private async consume(reader: ReadableStreamDefaultReader): Promise { + const decoder = new TextDecoder('utf-8'); + let buffer = ''; + + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + buffer += decoder.decode(value, { stream: true }); + + const parts = buffer.split('\n\n'); + buffer = parts.pop() ?? ''; + + for (const part of parts) { + if (!part.startsWith('data: ')) { + continue; + } + const payload = part.slice(6); + if (!payload || payload === '[DONE]') { + continue; + } + + try { + const message = JSON.parse(payload) as JsonRpcMessage; + this.handler?.(message); + } catch (error) { + console.error('[MCP][Streamable HTTP] Failed to parse JSON payload:', error); + } + } + } + } catch (error) { + if ((error as Error)?.name === 'AbortError') { + return; + } + throw error; + } finally { + reader.releaseLock(); + } + } +} diff --git a/tools/server/webui/src/lib/mcp/transports/types.ts b/tools/server/webui/src/lib/mcp/transports/types.ts new file mode 100644 index 00000000000..c0182d5b492 --- /dev/null +++ b/tools/server/webui/src/lib/mcp/transports/types.ts @@ -0,0 +1,8 @@ +import type { JsonRpcMessage } from '../types'; + +export interface MCPTransport { + start(): Promise; + stop(): Promise; + send(message: JsonRpcMessage): Promise; + onMessage(handler: (message: JsonRpcMessage) => void): void; +} diff --git a/tools/server/webui/src/lib/mcp/transports/websocket.ts b/tools/server/webui/src/lib/mcp/transports/websocket.ts new file mode 100644 index 00000000000..f40aa941b6a --- /dev/null +++ b/tools/server/webui/src/lib/mcp/transports/websocket.ts @@ -0,0 +1,238 @@ +import type { JsonRpcMessage } from '$lib/mcp/types'; +import type { MCPTransport } from './types'; + +export type WebSocketTransportOptions = { + url: string; + protocols?: string | string[]; + handshakeTimeoutMs?: number; +}; + +export type TransportMessageHandler = (message: JsonRpcMessage) => void; + +function ensureWebSocket(): typeof WebSocket | null { + if (typeof WebSocket !== 'undefined') { + return WebSocket; + } + return null; +} + +function arrayBufferToString(buffer: ArrayBufferLike): string { + return new TextDecoder('utf-8').decode(new Uint8Array(buffer)); +} + +async function normalizePayload(data: unknown): Promise { + if (typeof data === 'string') { + return data; + } + + if (data instanceof ArrayBuffer) { + return arrayBufferToString(data); + } + + if (ArrayBuffer.isView(data)) { + return arrayBufferToString(data.buffer); + } + + if (typeof Blob !== 'undefined' && data instanceof Blob) { + return await data.text(); + } + + throw new Error('Unsupported WebSocket message payload type'); +} + +export class WebSocketTransport implements MCPTransport { + private socket: WebSocket | null = null; + private handler: TransportMessageHandler | null = null; + private openPromise: Promise | null = null; + private reconnectAttempts = 0; + private readonly maxReconnectAttempts = 5; + private readonly reconnectDelay = 1_000; + private isReconnecting = false; + private shouldAttemptReconnect = true; + + constructor(private readonly options: WebSocketTransportOptions) {} + + start(): Promise { + if (this.openPromise) { + return this.openPromise; + } + + this.shouldAttemptReconnect = true; + + this.openPromise = new Promise((resolve, reject) => { + const WebSocketImpl = ensureWebSocket(); + if (!WebSocketImpl) { + this.openPromise = null; + reject(new Error('WebSocket is not available in this environment')); + return; + } + + let handshakeTimeout: ReturnType | undefined; + const socket = this.options.protocols + ? new WebSocketImpl(this.options.url, this.options.protocols) + : new WebSocketImpl(this.options.url); + + const cleanup = () => { + if (!socket) return; + socket.onopen = null; + socket.onclose = null; + socket.onerror = null; + socket.onmessage = null; + if (handshakeTimeout) { + clearTimeout(handshakeTimeout); + handshakeTimeout = undefined; + } + }; + + const fail = (error: unknown) => { + cleanup(); + this.openPromise = null; + reject(error instanceof Error ? error : new Error('WebSocket connection error')); + }; + + socket.onopen = () => { + cleanup(); + this.socket = socket; + this.reconnectAttempts = 0; + this.attachMessageHandler(); + this.attachCloseHandler(socket); + resolve(); + this.openPromise = null; + }; + + socket.onerror = (event) => { + const error = event instanceof Event ? new Error('WebSocket connection error') : event; + fail(error); + }; + + socket.onclose = (event) => { + if (!this.socket) { + fail(new Error(`WebSocket closed before opening (code: ${event.code})`)); + } + }; + + if (this.options.handshakeTimeoutMs) { + handshakeTimeout = setTimeout(() => { + if (!this.socket) { + try { + socket.close(); + } catch (error) { + console.warn('[MCP][Transport] Failed to close socket after timeout:', error); + } + fail(new Error('WebSocket handshake timed out')); + } + }, this.options.handshakeTimeoutMs); + } + }); + + return this.openPromise; + } + + async send(message: JsonRpcMessage): Promise { + if (!this.socket || this.socket.readyState !== WebSocket.OPEN) { + throw new Error('WebSocket transport is not connected'); + } + this.socket.send(JSON.stringify(message)); + } + + async stop(): Promise { + this.shouldAttemptReconnect = false; + this.reconnectAttempts = 0; + this.isReconnecting = false; + + const socket = this.socket; + if (!socket) { + this.openPromise = null; + return; + } + + await new Promise((resolve) => { + const onClose = () => { + socket.removeEventListener('close', onClose); + resolve(); + }; + socket.addEventListener('close', onClose); + try { + socket.close(); + } catch (error) { + socket.removeEventListener('close', onClose); + console.warn('[MCP][Transport] Failed to close WebSocket:', error); + resolve(); + } + }); + + this.socket = null; + this.openPromise = null; + } + + onMessage(handler: TransportMessageHandler): void { + this.handler = handler; + this.attachMessageHandler(); + } + + private attachMessageHandler(): void { + if (!this.socket) { + return; + } + + this.socket.onmessage = (event: MessageEvent) => { + const payload = event.data; + void (async () => { + try { + const text = await normalizePayload(payload); + const parsed = JSON.parse(text); + this.handler?.(parsed as JsonRpcMessage); + } catch (error) { + console.error('[MCP][Transport] Failed to handle message:', error); + } + })(); + }; + } + + private attachCloseHandler(socket: WebSocket): void { + socket.onclose = (event) => { + this.socket = null; + + if (event.code === 1000 || !this.shouldAttemptReconnect) { + return; + } + + console.warn('[MCP][WebSocket] Connection closed unexpectedly, attempting reconnect'); + void this.reconnect(); + }; + } + + private async reconnect(): Promise { + if ( + this.isReconnecting || + this.reconnectAttempts >= this.maxReconnectAttempts || + !this.shouldAttemptReconnect + ) { + return; + } + + this.isReconnecting = true; + this.reconnectAttempts++; + + const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1); + await new Promise((resolve) => setTimeout(resolve, delay)); + + try { + this.openPromise = null; + await this.start(); + this.reconnectAttempts = 0; + console.log('[MCP][WebSocket] Reconnected successfully'); + } catch (error) { + console.error('[MCP][WebSocket] Reconnection failed:', error); + } finally { + this.isReconnecting = false; + if ( + !this.socket && + this.shouldAttemptReconnect && + this.reconnectAttempts < this.maxReconnectAttempts + ) { + void this.reconnect(); + } + } + } +} diff --git a/tools/server/webui/src/lib/mcp/types.ts b/tools/server/webui/src/lib/mcp/types.ts new file mode 100644 index 00000000000..41b0c391b97 --- /dev/null +++ b/tools/server/webui/src/lib/mcp/types.ts @@ -0,0 +1,124 @@ +export type JsonRpcId = number | string; + +export type JsonRpcRequest = { + jsonrpc: '2.0'; + id: JsonRpcId; + method: string; + params?: Record; +}; + +export type JsonRpcNotification = { + jsonrpc: '2.0'; + method: string; + params?: Record; +}; + +export type JsonRpcError = { + code: number; + message: string; + data?: unknown; +}; + +export type JsonRpcResponse = { + jsonrpc: '2.0'; + id: JsonRpcId; + result?: Record; + error?: JsonRpcError; +}; + +export type JsonRpcMessage = JsonRpcRequest | JsonRpcResponse | JsonRpcNotification; + +export class MCPError extends Error { + code: number; + data?: unknown; + + constructor(message: string, code: number, data?: unknown) { + super(message); + this.name = 'MCPError'; + this.code = code; + this.data = data; + } +} + +export type MCPToolInputSchema = Record; + +export type MCPToolDefinition = { + name: string; + description?: string; + inputSchema?: MCPToolInputSchema; +}; + +export type MCPServerCapabilities = Record; + +export type MCPClientCapabilities = Record; + +export type MCPTransportType = 'websocket' | 'streamable_http'; + +export type MCPServerConfig = { + /** MCP transport type. Defaults to `streamable_http`. */ + transport?: MCPTransportType; + /** Remote MCP endpoint URL. */ + url: string; + /** Optional WebSocket subprotocol(s). */ + protocols?: string | string[]; + /** Optional HTTP headers for environments that support them. */ + headers?: Record; + /** Optional credentials policy for fetch-based transports. */ + credentials?: RequestCredentials; + /** Optional handshake timeout override (ms). */ + handshakeTimeoutMs?: number; + /** Optional per-server request timeout override (ms). */ + requestTimeoutMs?: number; + /** Optional per-server capability overrides. */ + capabilities?: MCPClientCapabilities; + /** Optional pre-negotiated session identifier for Streamable HTTP transport. */ + sessionId?: string; +}; + +export type MCPClientInfo = { + name: string; + version?: string; +}; + +export type MCPClientConfig = { + servers: Record; + /** Defaults to `2025-06-18`. */ + protocolVersion?: string; + /** Default capabilities advertised during initialize. */ + capabilities?: MCPClientCapabilities; + /** Custom client info to advertise. */ + clientInfo?: MCPClientInfo; + /** Request timeout when waiting for MCP responses (ms). Default: 30_000. */ + requestTimeoutMs?: number; +}; + +export type MCPToolCallArguments = Record; + +export type MCPToolCall = { + id: string; + function: { + name: string; + arguments: string | MCPToolCallArguments; + }; +}; + +export type MCPToolResultContent = + | string + | { + type: 'text'; + text: string; + } + | { + type: 'image'; + data: string; + mimeType?: string; + } + | { + type: 'resource'; + resource: Record; + }; + +export type MCPToolsCallResult = { + content?: MCPToolResultContent | MCPToolResultContent[]; + result?: unknown; +}; diff --git a/tools/server/webui/src/lib/services/chat.ts b/tools/server/webui/src/lib/services/chat.ts index c03b764419f..22eef191f5a 100644 --- a/tools/server/webui/src/lib/services/chat.ts +++ b/tools/server/webui/src/lib/services/chat.ts @@ -1,5 +1,10 @@ -import { getJsonHeaders } from '$lib/utils'; +import { getAuthHeaders, getJsonHeaders } from '$lib/utils'; import { AttachmentType } from '$lib/enums'; +import { config } from '$lib/stores/settings.svelte'; +import { ensureMcpClient } from '$lib/services/mcp-singleton'; +import { getAgenticConfig } from '$lib/config/agentic'; +import { AgenticOrchestrator } from '$lib/agentic/orchestrator'; +import { OpenAISseClient } from '$lib/agentic/openai-sse-client'; /** * ChatService - Low-level API communication layer for Chat Completions @@ -169,6 +174,71 @@ export class ChatService { } } + // MCP agentic orchestration (low-coupling mode) + // Check if MCP client is available and agentic mode is enabled + if (stream) { + const mcpClient = await ensureMcpClient(); + const agenticConfig = mcpClient ? getAgenticConfig(config()) : undefined; + + // Debug: verify MCP tools are available + if (mcpClient) { + const availableTools = mcpClient.listTools(); + console.log( + `[MCP] Client initialized with ${availableTools.length} tools:`, + availableTools + ); + } else { + console.log('[MCP] No MCP client available'); + } + + if (mcpClient && agenticConfig?.enabled) { + try { + const llmClient = new OpenAISseClient({ + url: './v1/chat/completions', + buildHeaders: () => getAuthHeaders() + }); + + const orchestrator = new AgenticOrchestrator({ + mcpClient, + llmClient, + maxTurns: agenticConfig.maxTurns, + maxToolPreviewLines: agenticConfig.maxToolPreviewLines + }); + + let capturedTimings: ChatMessageTimings | undefined; + + await orchestrator.run({ + initialMessages: normalizedMessages, + requestTemplate: requestBody, + callbacks: { + onChunk, + onReasoningChunk, + onToolCallChunk, + onModel, + onComplete: onComplete + ? () => onComplete('', undefined, capturedTimings, undefined) + : undefined, + onError + }, + abortSignal: signal, + onProcessingUpdate: (timings, progress) => { + ChatService.notifyTimings(timings, progress, onTimings); + if (timings) { + capturedTimings = timings; + } + }, + maxTurns: agenticConfig.maxTurns, + filterReasoningAfterFirstTurn: agenticConfig.filterReasoningAfterFirstTurn + }); + + return; + } catch (error) { + // If MCP orchestration fails, log and fall through to standard flow + console.warn('MCP orchestration failed, falling back to standard flow:', error); + } + } + } + try { const response = await fetch(`./v1/chat/completions`, { method: 'POST', diff --git a/tools/server/webui/src/lib/services/mcp-singleton.ts b/tools/server/webui/src/lib/services/mcp-singleton.ts new file mode 100644 index 00000000000..4d91c08740a --- /dev/null +++ b/tools/server/webui/src/lib/services/mcp-singleton.ts @@ -0,0 +1,140 @@ +import { browser } from '$app/environment'; +import { MCPClient } from '$lib/mcp'; +import { buildMcpClientConfig } from '$lib/config/mcp'; +import { config } from '$lib/stores/settings.svelte'; + +const globalState = globalThis as typeof globalThis & { + __llamaMcpClient?: MCPClient; + __llamaMcpInitPromise?: Promise; + __llamaMcpConfigSignature?: string; + __llamaMcpInitConfigSignature?: string; +}; + +function serializeConfigSignature(): string | undefined { + const mcpConfig = buildMcpClientConfig(config()); + return mcpConfig ? JSON.stringify(mcpConfig) : undefined; +} + +async function shutdownClient(): Promise { + if (!globalState.__llamaMcpClient) return; + + const clientToShutdown = globalState.__llamaMcpClient; + globalState.__llamaMcpClient = undefined; + globalState.__llamaMcpConfigSignature = undefined; + + try { + await clientToShutdown.shutdown(); + } catch (error) { + console.error('[MCP] Failed to shutdown client:', error); + } +} + +async function bootstrapClient( + signature: string, + mcpConfig: ReturnType +): Promise { + if (!browser || !mcpConfig) { + return undefined; + } + + const client = new MCPClient(mcpConfig); + globalState.__llamaMcpInitConfigSignature = signature; + + const initPromise = client + .initialize() + .then(() => { + // Ignore initialization if config changed during bootstrap + if (globalState.__llamaMcpInitConfigSignature !== signature) { + void client.shutdown().catch((shutdownError) => { + console.error( + '[MCP] Failed to shutdown stale client after config change:', + shutdownError + ); + }); + return undefined; + } + + globalState.__llamaMcpClient = client; + globalState.__llamaMcpConfigSignature = signature; + return client; + }) + .catch((error) => { + console.error('[MCP] Failed to initialize client:', error); + + // Cleanup global references on error + if (globalState.__llamaMcpClient === client) { + globalState.__llamaMcpClient = undefined; + } + if (globalState.__llamaMcpConfigSignature === signature) { + globalState.__llamaMcpConfigSignature = undefined; + } + + void client.shutdown().catch((shutdownError) => { + console.error('[MCP] Failed to shutdown client after init error:', shutdownError); + }); + return undefined; + }) + .finally(() => { + // Clear init promise only if it's OUR promise + if (globalState.__llamaMcpInitPromise === initPromise) { + globalState.__llamaMcpInitPromise = undefined; + // Clear init signature only if it's still ours + if (globalState.__llamaMcpInitConfigSignature === signature) { + globalState.__llamaMcpInitConfigSignature = undefined; + } + } + }); + + globalState.__llamaMcpInitPromise = initPromise; + return initPromise; +} + +export function getMcpClient(): MCPClient | undefined { + return globalState.__llamaMcpClient; +} + +export async function ensureMcpClient(): Promise { + const signature = serializeConfigSignature(); + + // Configuration removed: shut down active client if present + if (!signature) { + // Wait for any in-flight init to complete before shutdown + if (globalState.__llamaMcpInitPromise) { + await globalState.__llamaMcpInitPromise; + } + await shutdownClient(); + globalState.__llamaMcpInitPromise = undefined; + globalState.__llamaMcpInitConfigSignature = undefined; + return undefined; + } + + // Client already initialized with correct config + if (globalState.__llamaMcpClient && globalState.__llamaMcpConfigSignature === signature) { + return globalState.__llamaMcpClient; + } + + // Init in progress with correct config + if ( + globalState.__llamaMcpInitPromise && + globalState.__llamaMcpInitConfigSignature === signature + ) { + return globalState.__llamaMcpInitPromise; + } + + // Config changed - wait for in-flight init before shutdown + if ( + globalState.__llamaMcpInitPromise && + globalState.__llamaMcpInitConfigSignature !== signature + ) { + await globalState.__llamaMcpInitPromise; + } + + // Shutdown if config changed + if (globalState.__llamaMcpConfigSignature !== signature) { + await shutdownClient(); + } + + // Bootstrap new client + const mcpConfig = buildMcpClientConfig(config()); + return bootstrapClient(signature, mcpConfig); +} diff --git a/tools/server/webui/src/lib/types/api.d.ts b/tools/server/webui/src/lib/types/api.d.ts index 4bc92b57bcd..8d725e4649f 100644 --- a/tools/server/webui/src/lib/types/api.d.ts +++ b/tools/server/webui/src/lib/types/api.d.ts @@ -1,6 +1,17 @@ import type { ServerModelStatus, ServerRole } from '$lib/enums'; import type { ChatMessagePromptProgress } from './chat'; +export interface ApiChatCompletionToolFunction { + name: string; + description?: string; + parameters: Record; +} + +export interface ApiChatCompletionTool { + type: 'function'; + function: ApiChatCompletionToolFunction; +} + export interface ApiChatMessageContentPart { type: 'text' | 'image_url' | 'input_audio'; text?: string; @@ -34,6 +45,8 @@ export interface ApiErrorResponse { export interface ApiChatMessageData { role: ChatRole; content: string | ApiChatMessageContentPart[]; + tool_calls?: ApiChatCompletionToolCall[]; + tool_call_id?: string; timestamp?: number; } @@ -185,6 +198,7 @@ export interface ApiChatCompletionRequest { }>; stream?: boolean; model?: string; + tools?: ApiChatCompletionTool[]; // Reasoning parameters reasoning_format?: string; // Generation parameters @@ -243,6 +257,7 @@ export interface ApiChatCompletionStreamChunk { model?: string; tool_calls?: ApiChatCompletionToolCallDelta[]; }; + finish_reason?: string | null; }>; timings?: { prompt_n?: number; @@ -263,8 +278,9 @@ export interface ApiChatCompletionResponse { content: string; reasoning_content?: string; model?: string; - tool_calls?: ApiChatCompletionToolCallDelta[]; + tool_calls?: ApiChatCompletionToolCall[]; }; + finish_reason?: string | null; }>; } diff --git a/tools/server/webui/src/lib/utils/chat-stream.ts b/tools/server/webui/src/lib/utils/chat-stream.ts new file mode 100644 index 00000000000..44145348b2c --- /dev/null +++ b/tools/server/webui/src/lib/utils/chat-stream.ts @@ -0,0 +1,85 @@ +import type { + ApiChatCompletionResponse, + ApiChatCompletionStreamChunk, + ApiChatCompletionToolCall, + ApiChatCompletionToolCallDelta +} from '$lib/types/api'; + +export function mergeToolCallDeltas( + existing: ApiChatCompletionToolCall[], + deltas: ApiChatCompletionToolCallDelta[], + indexOffset = 0 +): ApiChatCompletionToolCall[] { + const result = existing.map((call) => ({ + ...call, + function: call.function ? { ...call.function } : undefined + })); + + for (const delta of deltas) { + const index = + typeof delta.index === 'number' && delta.index >= 0 + ? delta.index + indexOffset + : result.length; + + while (result.length <= index) { + result.push({ function: undefined }); + } + + const target = result[index]!; + + if (delta.id) { + target.id = delta.id; + } + + if (delta.type) { + target.type = delta.type; + } + + if (delta.function) { + const fn = target.function ? { ...target.function } : {}; + + if (delta.function.name) { + fn.name = delta.function.name; + } + + if (delta.function.arguments) { + fn.arguments = (fn.arguments ?? '') + delta.function.arguments; + } + + target.function = fn; + } + } + + return result; +} + +export function extractModelName( + data: ApiChatCompletionStreamChunk | ApiChatCompletionResponse | unknown +): string | undefined { + const asRecord = (value: unknown): Record | undefined => { + return typeof value === 'object' && value !== null + ? (value as Record) + : undefined; + }; + + const getTrimmedString = (value: unknown): string | undefined => { + return typeof value === 'string' && value.trim() ? value.trim() : undefined; + }; + + const root = asRecord(data); + if (!root) return undefined; + + const rootModel = getTrimmedString(root.model); + if (rootModel) return rootModel; + + const firstChoice = Array.isArray(root.choices) ? asRecord(root.choices[0]) : undefined; + if (!firstChoice) return undefined; + + const deltaModel = getTrimmedString(asRecord(firstChoice.delta)?.model); + if (deltaModel) return deltaModel; + + const messageModel = getTrimmedString(asRecord(firstChoice.message)?.model); + if (messageModel) return messageModel; + + return undefined; +}