From 5f88b1d4a43d0ddc5f0a159900a058402f38b9f3 Mon Sep 17 00:00:00 2001 From: devcrocod Date: Wed, 19 Nov 2025 18:21:49 +0100 Subject: [PATCH 01/15] Updated `Transport.send` method signature across all implementations to include `TransportSendOptions`. Added support for optional resumption tokens and progress callbacks in request handling. # Conflicts: # kotlin-sdk-server/api/kotlin-sdk-server.api --- kotlin-sdk-core/api/kotlin-sdk-core.api | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/kotlin-sdk-core/api/kotlin-sdk-core.api b/kotlin-sdk-core/api/kotlin-sdk-core.api index 8694e2d5..0158259a 100644 --- a/kotlin-sdk-core/api/kotlin-sdk-core.api +++ b/kotlin-sdk-core/api/kotlin-sdk-core.api @@ -454,12 +454,19 @@ public final class io/modelcontextprotocol/kotlin/sdk/shared/ProtocolKt { } public class io/modelcontextprotocol/kotlin/sdk/shared/ProtocolOptions { - public synthetic fun (ZJILkotlin/jvm/internal/DefaultConstructorMarker;)V - public synthetic fun (ZJLkotlin/jvm/internal/DefaultConstructorMarker;)V + public fun ()V + public fun (ZLjava/util/List;)V + public synthetic fun (ZLjava/util/List;ILkotlin/jvm/internal/DefaultConstructorMarker;)V + public final fun component1 ()Z + public final fun component2 ()Ljava/util/List; + public fun copy (ZLjava/util/List;)Lio/modelcontextprotocol/kotlin/sdk/shared/ProtocolOptions; + public static synthetic fun copy$default (Lio/modelcontextprotocol/kotlin/sdk/shared/ProtocolOptions;ZLjava/util/List;ILjava/lang/Object;)Lio/modelcontextprotocol/kotlin/sdk/shared/ProtocolOptions; + public fun equals (Ljava/lang/Object;)Z + public final fun getDebouncedNotificationMethods ()Ljava/util/List; public final fun getEnforceStrictCapabilities ()Z - public final fun getTimeout-UwyO8pc ()J + public fun hashCode ()I public final fun setEnforceStrictCapabilities (Z)V - public final fun setTimeout-LRDsOJo (J)V + public fun toString ()Ljava/lang/String; } public final class io/modelcontextprotocol/kotlin/sdk/shared/ReadBuffer { From c41c2271345db9a1f64f3cf31657cc9cebf91866 Mon Sep 17 00:00:00 2001 From: Xiangyu Zhang Date: Mon, 25 Aug 2025 16:22:01 +0800 Subject: [PATCH 02/15] Add Streamable Http Transport --- kotlin-sdk-server/api/kotlin-sdk-server.api | 31 + .../kotlin/sdk/server/KtorServer.kt | 130 ++++ .../server/StreamableHttpServerTransport.kt | 575 ++++++++++++++++++ 3 files changed, 736 insertions(+) create mode 100644 kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt diff --git a/kotlin-sdk-server/api/kotlin-sdk-server.api b/kotlin-sdk-server/api/kotlin-sdk-server.api index 8016d3bc..3693cc40 100644 --- a/kotlin-sdk-server/api/kotlin-sdk-server.api +++ b/kotlin-sdk-server/api/kotlin-sdk-server.api @@ -1,7 +1,20 @@ +public final class io/modelcontextprotocol/kotlin/sdk/LibVersionKt { + public static final field LIB_VERSION Ljava/lang/String; +} + +public abstract interface class io/modelcontextprotocol/kotlin/sdk/server/EventStore { + public abstract fun replayEventsAfter (Ljava/lang/String;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public abstract fun storeEvent (Ljava/lang/String;Lio/modelcontextprotocol/kotlin/sdk/JSONRPCMessage;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; +} + public final class io/modelcontextprotocol/kotlin/sdk/server/KtorServerKt { public static final fun mcp (Lio/ktor/server/application/Application;Lkotlin/jvm/functions/Function1;)V public static final fun mcp (Lio/ktor/server/routing/Routing;Ljava/lang/String;Lkotlin/jvm/functions/Function1;)V public static final fun mcp (Lio/ktor/server/routing/Routing;Lkotlin/jvm/functions/Function1;)V + public static final fun mcpStatelessStreamableHttp (Lio/ktor/server/application/Application;ZLjava/util/List;Ljava/util/List;Lio/modelcontextprotocol/kotlin/sdk/server/EventStore;Lkotlin/jvm/functions/Function1;)V + public static synthetic fun mcpStatelessStreamableHttp$default (Lio/ktor/server/application/Application;ZLjava/util/List;Ljava/util/List;Lio/modelcontextprotocol/kotlin/sdk/server/EventStore;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)V + public static final fun mcpStreamableHttp (Lio/ktor/server/application/Application;ZLjava/util/List;Ljava/util/List;Lio/modelcontextprotocol/kotlin/sdk/server/EventStore;Lkotlin/jvm/functions/Function1;)V + public static synthetic fun mcpStreamableHttp$default (Lio/ktor/server/application/Application;ZLjava/util/List;Ljava/util/List;Lio/modelcontextprotocol/kotlin/sdk/server/EventStore;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)V } public final class io/modelcontextprotocol/kotlin/sdk/server/RegisteredPrompt : io/modelcontextprotocol/kotlin/sdk/server/Feature { @@ -147,6 +160,24 @@ public final class io/modelcontextprotocol/kotlin/sdk/server/StdioServerTranspor public fun start (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; } +public final class io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport : io/modelcontextprotocol/kotlin/sdk/shared/AbstractTransport { + public static final field STANDALONE_SSE_STREAM_ID Ljava/lang/String; + public fun ()V + public fun (ZZLjava/util/List;Ljava/util/List;Lio/modelcontextprotocol/kotlin/sdk/server/EventStore;)V + public synthetic fun (ZZLjava/util/List;Ljava/util/List;Lio/modelcontextprotocol/kotlin/sdk/server/EventStore;ILkotlin/jvm/internal/DefaultConstructorMarker;)V + public fun close (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public final fun getSessionId ()Ljava/lang/String; + public final fun handleDeleteRequest (Lio/ktor/server/sse/ServerSSESession;Lio/ktor/server/application/ApplicationCall;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public final fun handleGetRequest (Lio/ktor/server/sse/ServerSSESession;Lio/ktor/server/application/ApplicationCall;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public final fun handlePostRequest (Lio/ktor/server/sse/ServerSSESession;Lio/ktor/server/application/ApplicationCall;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public final fun handleRequest (Lio/ktor/server/sse/ServerSSESession;Lio/ktor/server/application/ApplicationCall;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public fun send (Lio/modelcontextprotocol/kotlin/sdk/JSONRPCMessage;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public final fun setOnSessionClosed (Lkotlin/jvm/functions/Function1;)V + public final fun setOnSessionInitialized (Lkotlin/jvm/functions/Function1;)V + public final fun setSessionIdGenerator (Lkotlin/jvm/functions/Function0;)V + public fun start (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; +} + public final class io/modelcontextprotocol/kotlin/sdk/server/WebSocketMcpKtorServerExtensionsKt { public static final fun mcpWebSocket (Lio/ktor/server/application/Application;Ljava/lang/String;Lkotlin/jvm/functions/Function0;)V public static final fun mcpWebSocket (Lio/ktor/server/application/Application;Lkotlin/jvm/functions/Function0;)V diff --git a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/KtorServer.kt b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/KtorServer.kt index 2a2f950a..a2911fa4 100644 --- a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/KtorServer.kt +++ b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/KtorServer.kt @@ -4,6 +4,7 @@ import io.github.oshai.kotlinlogging.KotlinLogging import io.ktor.http.HttpStatusCode import io.ktor.server.application.Application import io.ktor.server.application.install +import io.ktor.server.request.header import io.ktor.server.response.respond import io.ktor.server.routing.Routing import io.ktor.server.routing.RoutingContext @@ -13,6 +14,7 @@ import io.ktor.server.routing.routing import io.ktor.server.sse.SSE import io.ktor.server.sse.ServerSSESession import io.ktor.server.sse.sse +import io.ktor.util.collections.ConcurrentMap import io.ktor.utils.io.KtorDsl import kotlinx.atomicfu.AtomicRef import kotlinx.atomicfu.atomic @@ -20,6 +22,7 @@ import kotlinx.atomicfu.update import kotlinx.collections.immutable.PersistentMap import kotlinx.collections.immutable.toPersistentMap import kotlinx.coroutines.awaitCancellation +import io.modelcontextprotocol.kotlin.sdk.ErrorCode private val logger = KotlinLogging.logger {} @@ -69,6 +72,51 @@ public fun Application.mcp(block: ServerSSESession.() -> Server) { } } +@KtorDsl +public fun Application.mcpStreamableHttp( + enableDnsRebindingProtection: Boolean = false, + allowedHosts: List? = null, + allowedOrigins: List? = null, + eventStore: EventStore? = null, + block: RoutingContext.() -> Server, +) { + val transports = ConcurrentMap() + + routing { + post("/mcp") { + mcpStreamableHttpEndpoint( + transports, + enableDnsRebindingProtection, + allowedHosts, + allowedOrigins, + eventStore, + block, + ) + } + } +} + +@KtorDsl +public fun Application.mcpStatelessStreamableHttp( + enableDnsRebindingProtection: Boolean = false, + allowedHosts: List? = null, + allowedOrigins: List? = null, + eventStore: EventStore? = null, + block: RoutingContext.() -> Server, +) { + routing { + post("/mcp") { + mcpStatelessStreamableHttpEndpoint( + enableDnsRebindingProtection, + allowedHosts, + allowedOrigins, + eventStore, + block, + ) + } + } +} + internal suspend fun ServerSSESession.mcpSseEndpoint( postEndpoint: String, sseTransportManager: SseTransportManager, @@ -101,6 +149,88 @@ internal fun ServerSSESession.mcpSseTransport( return transport } +internal suspend fun RoutingContext.mcpStreamableHttpEndpoint( + transports: ConcurrentMap, + enableDnsRebindingProtection: Boolean = false, + allowedHosts: List? = null, + allowedOrigins: List? = null, + eventStore: EventStore? = null, + block: RoutingContext.() -> Server, +) { + val sessionId = this.call.request.header(MCP_SESSION_ID_HEADER) + val transport = if (sessionId != null && transports.containsKey(sessionId)) { + transports[sessionId]!! + } else if (sessionId == null) { + val transport = StreamableHttpServerTransport( + enableDnsRebindingProtection = enableDnsRebindingProtection, + allowedHosts = allowedHosts, + allowedOrigins = allowedOrigins, + eventStore = eventStore, + enableJsonResponse = true, + ) + + transport.setOnSessionInitialized { sessionId -> + transports[sessionId] = transport + + logger.info { "New StreamableHttp connection established and stored with sessionId: $sessionId" } + } + + val server = block() + server.onClose { + logger.info { "Server connection closed for sessionId: ${transport.sessionId}" } + } + + server.connect(transport) + + transport + } else { + null + } + + if (transport == null) { + this.call.reject( + HttpStatusCode.BadRequest, + ErrorCode.Unknown(-32000), + "Bad Request: No valid session ID provided", + ) + return + } + + transport.handleRequest(null, this.call) + logger.debug { "Server connected to transport for sessionId: ${transport.sessionId}" } +} + +internal suspend fun RoutingContext.mcpStatelessStreamableHttpEndpoint( + enableDnsRebindingProtection: Boolean = false, + allowedHosts: List? = null, + allowedOrigins: List? = null, + eventStore: EventStore? = null, + block: RoutingContext.() -> Server, +) { + val transport = StreamableHttpServerTransport( + enableDnsRebindingProtection = enableDnsRebindingProtection, + allowedHosts = allowedHosts, + allowedOrigins = allowedOrigins, + eventStore = eventStore, + enableJsonResponse = true, + ) + transport.setSessionIdGenerator(null) + + logger.info { "New stateless StreamableHttp connection established without sessionId" } + + val server = block() + + server.onClose { + logger.info { "Server connection closed without sessionId" } + } + + server.connect(transport) + + transport.handleRequest(null, this.call) + + logger.debug { "Server connected to transport without sessionId" } +} + internal suspend fun RoutingContext.mcpPostEndpoint(sseTransportManager: SseTransportManager) { val sessionId: String = call.request.queryParameters["sessionId"] ?: run { call.respond(HttpStatusCode.BadRequest, "sessionId query parameter is not provided") diff --git a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt new file mode 100644 index 00000000..af765a87 --- /dev/null +++ b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt @@ -0,0 +1,575 @@ +package io.modelcontextprotocol.kotlin.sdk.server + +import io.ktor.http.ContentType +import io.ktor.http.HttpHeaders +import io.ktor.http.HttpMethod +import io.ktor.http.HttpStatusCode +import io.ktor.server.application.ApplicationCall +import io.ktor.server.request.contentType +import io.ktor.server.request.header +import io.ktor.server.request.host +import io.ktor.server.request.httpMethod +import io.ktor.server.request.receiveText +import io.ktor.server.response.header +import io.ktor.server.response.respond +import io.ktor.server.response.respondNullable +import io.ktor.server.sse.ServerSSESession +import io.ktor.util.collections.ConcurrentMap +import io.modelcontextprotocol.kotlin.sdk.ErrorCode +import io.modelcontextprotocol.kotlin.sdk.JSONRPCError +import io.modelcontextprotocol.kotlin.sdk.JSONRPCMessage +import io.modelcontextprotocol.kotlin.sdk.JSONRPCRequest +import io.modelcontextprotocol.kotlin.sdk.JSONRPCResponse +import io.modelcontextprotocol.kotlin.sdk.LATEST_PROTOCOL_VERSION +import io.modelcontextprotocol.kotlin.sdk.Method +import io.modelcontextprotocol.kotlin.sdk.RequestId +import io.modelcontextprotocol.kotlin.sdk.SUPPORTED_PROTOCOL_VERSIONS +import io.modelcontextprotocol.kotlin.sdk.shared.AbstractTransport +import io.modelcontextprotocol.kotlin.sdk.shared.McpJson +import kotlinx.coroutines.job +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock +import kotlinx.serialization.json.JsonArray +import kotlinx.serialization.json.JsonObject +import kotlinx.serialization.json.decodeFromJsonElement +import kotlin.concurrent.atomics.AtomicBoolean +import kotlin.concurrent.atomics.ExperimentalAtomicApi +import kotlin.uuid.ExperimentalUuidApi +import kotlin.uuid.Uuid + +internal const val MCP_SESSION_ID_HEADER = "mcp-session-id" +private const val MCP_PROTOCOL_VERSION_HEADER = "mcp-protocol-version" +private const val MCP_RESUMPTION_TOKEN_HEADER = "Last-Event-ID" + +/** + * Interface for resumability support via event storage + */ +public interface EventStore { + /** + * Stores an event for later retrieval + * @param streamId ID of the stream the event belongs to + * @param message The JSON-RPC message to store + * @returns The generated event ID for the stored event + */ + public suspend fun storeEvent(streamId: String, message: JSONRPCMessage): String + + /** + * Replays events after the specified event ID + * @param lastEventId The last event ID that was received + * @param sender Function to send events + * @return The stream ID for the replayed events + */ + public suspend fun replayEventsAfter( + lastEventId: String, + sender: suspend (eventId: String, message: JSONRPCMessage) -> Unit, + ): String +} + +/** + * A holder for an active request call. + * If enableJsonResponse is true, session is null. + * Otherwise, session is not null. + */ +private data class SessionContext(val session: ServerSSESession?, val call: ApplicationCall) + +/** + * Server transport for Streamable HTTP: this implements the MCP Streamable HTTP transport specification. + * It supports both SSE streaming and direct HTTP responses. + * + * In stateful mode: + * - Session ID is generated and included in response headers + * - Session ID is always included in initialization responses + * - Requests with invalid session IDs are rejected with 404 Not Found + * - Non-initialization requests without a session ID are rejected with 400 Bad Request + * - State is maintained in-memory (connections, message history) + * + * In stateless mode: + * - No Session ID is included in any responses + * - No session validation is performed + * + * @param enableJsonResponse If true, the server will return JSON responses instead of starting an SSE stream. + * This can be useful for simple request/response scenarios without streaming. + * Default is false (SSE streams are preferred). + * @param enableDnsRebindingProtection Enable DNS rebinding protection (requires allowedHosts and/or allowedOrigins to be configured). + * Default is false for backwards compatibility. + * @param allowedHosts List of allowed host header values for DNS rebinding protection. + * If not specified, host validation is disabled. + * @param allowedOrigins List of allowed origin header values for DNS rebinding protection. + * If not specified, origin validation is disabled. + * @param eventStore Event store for resumability support + * If provided, resumability will be enabled, allowing clients to reconnect and resume messages + */ +@OptIn(ExperimentalUuidApi::class, ExperimentalAtomicApi::class) +public class StreamableHttpServerTransport( + private val enableJsonResponse: Boolean = false, + private val enableDnsRebindingProtection: Boolean = false, + private val allowedHosts: List? = null, + private val allowedOrigins: List? = null, + private val eventStore: EventStore? = null, +) : AbstractTransport() { + public var sessionId: String? = null + private set + + private var sessionIdGenerator: (() -> String)? = { Uuid.random().toString() } + private var onSessionInitialized: ((sessionId: String) -> Unit)? = null + private var onSessionClosed: ((sessionId: String) -> Unit)? = null + + private val started: AtomicBoolean = AtomicBoolean(false) + private val initialized: AtomicBoolean = AtomicBoolean(false) + + private val streamsMapping: ConcurrentMap = ConcurrentMap() + private val requestToStreamMapping: ConcurrentMap = ConcurrentMap() + private val requestToResponseMapping: ConcurrentMap = ConcurrentMap() + + private val sessionMutex = Mutex() + private val streamMutex = Mutex() + + private companion object { + const val STANDALONE_SSE_STREAM_ID = "_GET_stream" + } + + /** + * Function that generates a session ID for the transport. + * The session ID SHOULD be globally unique and cryptographically secure + * (e.g., a securely generated UUID, a JWT, or a cryptographic hash) + * + * Set undefined to disable session management. + */ + public fun setSessionIdGenerator(block: (() -> String)?) { + sessionIdGenerator = block + } + + /** + * A callback for session initialization events + * This is called when the server initializes a new session. + * Useful in cases when you need to register multiple mcp sessions + * and need to keep track of them. + */ + public fun setOnSessionInitialized(block: ((String) -> Unit)?) { + onSessionInitialized = block + } + + /** + * A callback for session close events + * This is called when the server closes a session due to a DELETE request. + * Useful in cases when you need to clean up resources associated with the session. + * Note that this is different from the transport closing, if you are handling + * HTTP requests from multiple nodes you might want to close each + * StreamableHTTPServerTransport after a request is completed while still keeping the + * session open/running. + */ + public fun setOnSessionClosed(block: ((String) -> Unit)?) { + onSessionClosed = block + } + + override suspend fun start() { + check(started.compareAndSet(expectedValue = false, newValue = true)) { + "StreamableHttpServerTransport already started! If using Server class, note that connect() calls start() automatically." + } + } + + override suspend fun send(message: JSONRPCMessage) { + val requestId: RequestId? = when (message) { + is JSONRPCResponse -> message.id + is JSONRPCError -> message.id + else -> null + } + + // Standalone SSE stream + if (requestId == null) { + require(message !is JSONRPCResponse && message !is JSONRPCError) { + "Cannot send a response on a standalone SSE stream unless resuming a previous client request" + } + val standaloneStream = streamsMapping[STANDALONE_SSE_STREAM_ID] ?: return + emitOnStream(STANDALONE_SSE_STREAM_ID, standaloneStream.session!!, message) + return + } + + val streamId = requestToStreamMapping[requestId] + ?: error("No connection established for request ID: $requestId") + val activeStream = streamsMapping[streamId] + + if (!enableJsonResponse) { + activeStream?.let { stream -> + emitOnStream(streamId, stream.session!!, message) + } + } + + val isTerminated = message is JSONRPCResponse || message is JSONRPCError + if (!isTerminated) return + + requestToResponseMapping[requestId] = message + val relatedIds = requestToStreamMapping.filterValues { it == streamId }.keys + + val allResponseReady = relatedIds.all { it in requestToResponseMapping } + if (!allResponseReady) return + + streamMutex.withLock { + if (activeStream == null) error("No connection established for request ID: $requestId") + + if (enableJsonResponse) { + activeStream.call.response.header(HttpHeaders.ContentType, ContentType.Application.Json.toString()) + sessionId?.let { activeStream.call.response.header(MCP_SESSION_ID_HEADER, it) } + val responses = relatedIds + .mapNotNull { requestToResponseMapping[it] } + .map { McpJson.encodeToString(it) } + val payload = if (responses.size == 1) { + responses.first() + } else { + responses + } + activeStream.call.respond(payload) + } else { + activeStream.session!!.close() + } + + // Clean up + relatedIds.forEach { requestId -> + requestToResponseMapping.remove(requestId) + requestToStreamMapping.remove(requestId) + } + } + } + + override suspend fun close() { + streamMutex.withLock { + streamsMapping.values.forEach { + try { + it.session?.close() + } catch (_: Exception) {} + } + streamsMapping.clear() + requestToResponseMapping.clear() + _onClose() + } + } + + /** + * Handles an incoming HTTP request, whether GET, POST or DELETE + */ + public suspend fun handleRequest(session: ServerSSESession?, call: ApplicationCall) { + validateHeaders(call)?.let { reason -> + call.reject(HttpStatusCode.Forbidden, ErrorCode.Unknown(-32000), reason) + _onError(Error(reason)) + return + } + + when (call.request.httpMethod) { + HttpMethod.Post -> handlePostRequest(session, call) + + HttpMethod.Get -> handleGetRequest(session, call) + + HttpMethod.Delete -> handleDeleteRequest(session, call) + + else -> call.run { + response.header(HttpHeaders.Allow, "GET, POST, DELETE") + reject(HttpStatusCode.MethodNotAllowed, ErrorCode.Unknown(-32000), "Method not allowed.") + } + } + } + + /** + * Handles POST requests containing JSON-RPC messages + */ + public suspend fun handlePostRequest(session: ServerSSESession?, call: ApplicationCall) { + try { + if (!enableJsonResponse && session == null) error("Server session can't be null with json response") + + val acceptHeader = call.request.header(HttpHeaders.Accept) + val isAcceptEventStream = acceptHeader.accepts(ContentType.Text.EventStream) + val isAcceptJson = acceptHeader.accepts(ContentType.Application.Json) + + if (!isAcceptEventStream || !isAcceptJson) { + call.reject( + HttpStatusCode.NotAcceptable, + ErrorCode.Unknown(-32000), + "Not Acceptable: Client must accept both application/json and text/event-stream", + ) + return + } + + if (!call.request.contentType().match(ContentType.Application.Json)) { + call.reject( + HttpStatusCode.UnsupportedMediaType, + ErrorCode.Unknown(-32000), + "Unsupported Media Type: Content-Type must be application/json", + ) + return + } + + val messages = parseBody(call) ?: return + val isInitializationRequest = messages.any { + it is JSONRPCRequest && it.method == Method.Defined.Initialize.value + } + + if (isInitializationRequest) { + if (initialized.load() && sessionId != null) { + call.reject( + HttpStatusCode.BadRequest, + ErrorCode.Defined.InvalidRequest, + "Invalid Request: Server already initialized", + ) + return + } + if (messages.size > 1) { + call.reject( + HttpStatusCode.BadRequest, + ErrorCode.Defined.InvalidRequest, + "Invalid Request: Only one initialization request is allowed", + ) + return + } + + sessionMutex.withLock { + if (sessionId != null) return@withLock + sessionId = sessionIdGenerator?.invoke() + initialized.store(true) + sessionId?.let { onSessionInitialized?.invoke(it) } + } + } else { + if (!validateSession(call) || !validateProtocolVersion(call)) return + } + + val hasRequest = messages.any { it is JSONRPCRequest } + if (!hasRequest) { + call.respondNullable(status = HttpStatusCode.Accepted, message = null) + messages.forEach { message -> _onMessage(message) } + return + } + + val streamId = Uuid.random().toString() + if (!enableJsonResponse) { + call.appendSseHeaders() + session!!.send(data = "") // flush headers immediately + } + + streamMutex.withLock { + streamsMapping[streamId] = SessionContext(session, call) + messages.filterIsInstance().forEach { requestToStreamMapping[it.id] = streamId } + } + call.coroutineContext.job.invokeOnCompletion { streamsMapping.remove(streamId) } + + messages.forEach { message -> _onMessage(message) } + } catch (e: Exception) { + call.reject( + HttpStatusCode.BadRequest, + ErrorCode.Defined.ParseError, + "Parse error: ${e.message}", + ) + _onError(e) + } + } + + public suspend fun handleGetRequest(session: ServerSSESession?, call: ApplicationCall) { + if (enableJsonResponse) { + call.reject( + HttpStatusCode.MethodNotAllowed, + ErrorCode.Unknown(-32000), + "Method not allowed.", + ) + return + } + session!! + + val acceptHeader = call.request.header(HttpHeaders.Accept) + if (!acceptHeader.accepts(ContentType.Text.EventStream)) { + call.reject( + HttpStatusCode.NotAcceptable, + ErrorCode.Unknown(-32000), + "Not Acceptable: Client must accept text/event-stream", + ) + return + } + + if (!validateSession(call) || !validateProtocolVersion(call)) return + + eventStore?.let { store -> + call.request.header(MCP_RESUMPTION_TOKEN_HEADER)?.let { lastEventId -> + replayEvents(store, lastEventId, session) + return + } + } + + if (STANDALONE_SSE_STREAM_ID in streamsMapping) { + call.reject( + HttpStatusCode.Conflict, + ErrorCode.Unknown(-32000), + "Conflict: Only one SSE stream is allowed per session", + ) + return + } + + call.appendSseHeaders() + session.send(data = "") // flush headers immediately + streamsMapping[STANDALONE_SSE_STREAM_ID] = SessionContext(session, call) + session.coroutineContext.job.invokeOnCompletion { streamsMapping.remove(STANDALONE_SSE_STREAM_ID) } + } + + public suspend fun handleDeleteRequest(session: ServerSSESession?, call: ApplicationCall) { + if (enableJsonResponse) { + call.reject( + HttpStatusCode.MethodNotAllowed, + ErrorCode.Unknown(-32000), + "Method not allowed.", + ) + } + + if (!validateSession(call) || !validateProtocolVersion(call)) return + sessionId?.let { onSessionClosed?.invoke(it) } + close() + call.respondNullable(status = HttpStatusCode.OK, message = null) + } + + private suspend fun replayEvents(store: EventStore, lastEventId: String, session: ServerSSESession) { + val call: ApplicationCall = session.call + + try { + call.appendSseHeaders() + val streamId = store.replayEventsAfter(lastEventId) { eventId, message -> + try { + session.send( + event = "message", + id = eventId, + data = McpJson.encodeToString(message), + ) + } catch (e: Exception) { + _onError(e) + } + } + streamsMapping[streamId] = SessionContext(session, call) + } catch (e: Exception) { + _onError(e) + } + } + + private suspend fun validateSession(call: ApplicationCall): Boolean { + if (sessionIdGenerator == null) return true + + if (!initialized.load()) { + call.reject( + HttpStatusCode.BadRequest, + ErrorCode.Unknown(-32000), + "Bad Request: Server not initialized", + ) + return false + } + + val headerId = call.request.header(MCP_SESSION_ID_HEADER) + + return when { + headerId == null -> { + call.reject( + HttpStatusCode.BadRequest, + ErrorCode.Unknown(-32000), + "Bad Request: Mcp-Session-Id header is required", + ) + false + } + + headerId != sessionId -> { + call.reject( + HttpStatusCode.NotFound, + ErrorCode.Unknown(-32001), + "Session not found", + ) + false + } + + else -> true + } + } + + private suspend fun validateProtocolVersion(call: ApplicationCall): Boolean { + val version = call.request.header(MCP_PROTOCOL_VERSION_HEADER) ?: LATEST_PROTOCOL_VERSION + + return when (version) { + !in SUPPORTED_PROTOCOL_VERSIONS -> { + call.reject( + HttpStatusCode.BadRequest, + ErrorCode.Unknown(-32000), + "Bad Request: Unsupported protocol version (supported versions: ${ + SUPPORTED_PROTOCOL_VERSIONS.joinToString( + ", ", + ) + })", + ) + false + } + + else -> true + } + } + + private fun validateHeaders(call: ApplicationCall): String? { + if (!enableDnsRebindingProtection) return null + + allowedHosts?.let { hosts -> + val hostHeader = call.request.host().substringBefore(':').lowercase() + if (hostHeader !in hosts.map { it.substringBefore(':').lowercase() }) { + return "Invalid Host header: $hostHeader" + } + } + + allowedOrigins?.let { origins -> + val originHeader = call.request.headers[HttpHeaders.Origin]?.removeSuffix("/")?.lowercase() + if (originHeader !in origins.map { it.removeSuffix("/").lowercase() }) { + return "Invalid Origin header: $originHeader" + } + } + + return null + } + + private suspend fun parseBody(call: ApplicationCall): List? { + val body = call.receiveText() + return when (val element = McpJson.parseToJsonElement(body)) { + is JsonObject -> listOf(McpJson.decodeFromJsonElement(element)) + + is JsonArray -> McpJson.decodeFromJsonElement>(element) + + else -> { + call.reject( + HttpStatusCode.BadRequest, + ErrorCode.Defined.InvalidRequest, + "Invalid Request: unable to parse JSON body", + ) + return null + } + } + } + + private fun String?.accepts(mime: ContentType): Boolean { + if (this == null) return false + + val escaped = Regex.escape(mime.toString()) + val pattern = Regex("""(^|,\s*)$escaped(\s*(;|,|$))""", RegexOption.IGNORE_CASE) + return pattern.containsMatchIn(this) + } + + private suspend fun emitOnStream(streamId: String, session: ServerSSESession, message: JSONRPCMessage) { + val eventId = eventStore?.storeEvent(streamId, message) + try { + session.send(event = "message", id = eventId, data = McpJson.encodeToString(message)) + } catch (_: Exception) { + streamsMapping.remove(streamId) + } + } + + private fun ApplicationCall.appendSseHeaders() { + this.response.headers.append(HttpHeaders.ContentType, ContentType.Text.EventStream.toString()) + this.response.headers.append(HttpHeaders.CacheControl, "no-cache, no-transform") + this.response.headers.append(HttpHeaders.Connection, "keep-alive") + sessionId?.let { this.response.headers.append(MCP_SESSION_ID_HEADER, it) } + this.response.status(HttpStatusCode.OK) + } +} + +internal suspend fun ApplicationCall.reject(status: HttpStatusCode, code: ErrorCode, message: String) { + this.response.status(status) + this.respond( + JSONRPCResponse( + id = null, + error = JSONRPCError(message = message, code = code), + ), + ) +} From ee3e6809a6e34b7da1fac4ec66864b40e7dc320d Mon Sep 17 00:00:00 2001 From: Xiangyu Zhang Date: Wed, 29 Oct 2025 22:48:49 +0800 Subject: [PATCH 03/15] update from comments --- kotlin-sdk-server/api/kotlin-sdk-server.api | 4 -- .../kotlin/sdk/server/KtorServer.kt | 48 ++++++++++--------- .../server/StreamableHttpServerTransport.kt | 5 +- 3 files changed, 28 insertions(+), 29 deletions(-) diff --git a/kotlin-sdk-server/api/kotlin-sdk-server.api b/kotlin-sdk-server/api/kotlin-sdk-server.api index 3693cc40..b4626230 100644 --- a/kotlin-sdk-server/api/kotlin-sdk-server.api +++ b/kotlin-sdk-server/api/kotlin-sdk-server.api @@ -1,7 +1,3 @@ -public final class io/modelcontextprotocol/kotlin/sdk/LibVersionKt { - public static final field LIB_VERSION Ljava/lang/String; -} - public abstract interface class io/modelcontextprotocol/kotlin/sdk/server/EventStore { public abstract fun replayEventsAfter (Ljava/lang/String;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public abstract fun storeEvent (Ljava/lang/String;Lio/modelcontextprotocol/kotlin/sdk/JSONRPCMessage;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; diff --git a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/KtorServer.kt b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/KtorServer.kt index a2911fa4..f1920ae5 100644 --- a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/KtorServer.kt +++ b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/KtorServer.kt @@ -14,7 +14,6 @@ import io.ktor.server.routing.routing import io.ktor.server.sse.SSE import io.ktor.server.sse.ServerSSESession import io.ktor.server.sse.sse -import io.ktor.util.collections.ConcurrentMap import io.ktor.utils.io.KtorDsl import kotlinx.atomicfu.AtomicRef import kotlinx.atomicfu.atomic @@ -23,16 +22,19 @@ import kotlinx.collections.immutable.PersistentMap import kotlinx.collections.immutable.toPersistentMap import kotlinx.coroutines.awaitCancellation import io.modelcontextprotocol.kotlin.sdk.ErrorCode +import io.modelcontextprotocol.kotlin.sdk.shared.AbstractTransport private val logger = KotlinLogging.logger {} -internal class SseTransportManager(transports: Map = emptyMap()) { - private val transports: AtomicRef> = atomic(transports.toPersistentMap()) +internal class TransportManager(transports: Map = emptyMap()) { + private val transports: AtomicRef> = atomic(transports.toPersistentMap()) - fun getTransport(sessionId: String): SseServerTransport? = transports.value[sessionId] + fun hasTransport(sessionId: String): Boolean = transports.value.containsKey(sessionId) - fun addTransport(transport: SseServerTransport) { - transports.update { it.put(transport.sessionId, transport) } + fun getTransport(sessionId: String): AbstractTransport? = transports.value[sessionId] + + fun addTransport(sessionId: String, transport: AbstractTransport) { + transports.update { it.put(sessionId, transport) } } fun removeTransport(sessionId: String) { @@ -52,14 +54,14 @@ public fun Routing.mcp(path: String, block: ServerSSESession.() -> Server) { */ @KtorDsl public fun Routing.mcp(block: ServerSSESession.() -> Server) { - val sseTransportManager = SseTransportManager() + val transportManager = TransportManager() sse { - mcpSseEndpoint("", sseTransportManager, block) + mcpSseEndpoint("", transportManager, block) } post { - mcpPostEndpoint(sseTransportManager) + mcpPostEndpoint(transportManager) } } @@ -80,12 +82,12 @@ public fun Application.mcpStreamableHttp( eventStore: EventStore? = null, block: RoutingContext.() -> Server, ) { - val transports = ConcurrentMap() + val transportManager = TransportManager() routing { post("/mcp") { mcpStreamableHttpEndpoint( - transports, + transportManager, enableDnsRebindingProtection, allowedHosts, allowedOrigins, @@ -119,16 +121,16 @@ public fun Application.mcpStatelessStreamableHttp( internal suspend fun ServerSSESession.mcpSseEndpoint( postEndpoint: String, - sseTransportManager: SseTransportManager, + transportManager: TransportManager, block: ServerSSESession.() -> Server, ) { - val transport = mcpSseTransport(postEndpoint, sseTransportManager) + val transport = mcpSseTransport(postEndpoint, transportManager) val server = block() server.onClose { logger.info { "Server connection closed for sessionId: ${transport.sessionId}" } - sseTransportManager.removeTransport(transport.sessionId) + transportManager.removeTransport(transport.sessionId) } server.createSession(transport) @@ -140,17 +142,17 @@ internal suspend fun ServerSSESession.mcpSseEndpoint( internal fun ServerSSESession.mcpSseTransport( postEndpoint: String, - sseTransportManager: SseTransportManager, + transportManager: TransportManager, ): SseServerTransport { val transport = SseServerTransport(postEndpoint, this) - sseTransportManager.addTransport(transport) + transportManager.addTransport(transport.sessionId, transport) logger.info { "New SSE connection established and stored with sessionId: ${transport.sessionId}" } return transport } internal suspend fun RoutingContext.mcpStreamableHttpEndpoint( - transports: ConcurrentMap, + transportManager: TransportManager, enableDnsRebindingProtection: Boolean = false, allowedHosts: List? = null, allowedOrigins: List? = null, @@ -158,8 +160,8 @@ internal suspend fun RoutingContext.mcpStreamableHttpEndpoint( block: RoutingContext.() -> Server, ) { val sessionId = this.call.request.header(MCP_SESSION_ID_HEADER) - val transport = if (sessionId != null && transports.containsKey(sessionId)) { - transports[sessionId]!! + val transport = if (sessionId != null && transportManager.hasTransport(sessionId)) { + transportManager.getTransport(sessionId) } else if (sessionId == null) { val transport = StreamableHttpServerTransport( enableDnsRebindingProtection = enableDnsRebindingProtection, @@ -170,7 +172,7 @@ internal suspend fun RoutingContext.mcpStreamableHttpEndpoint( ) transport.setOnSessionInitialized { sessionId -> - transports[sessionId] = transport + transportManager.addTransport(sessionId, transport) logger.info { "New StreamableHttp connection established and stored with sessionId: $sessionId" } } @@ -196,7 +198,7 @@ internal suspend fun RoutingContext.mcpStreamableHttpEndpoint( return } - transport.handleRequest(null, this.call) + (transport as StreamableHttpServerTransport).handleRequest(null, this.call) logger.debug { "Server connected to transport for sessionId: ${transport.sessionId}" } } @@ -231,7 +233,7 @@ internal suspend fun RoutingContext.mcpStatelessStreamableHttpEndpoint( logger.debug { "Server connected to transport without sessionId" } } -internal suspend fun RoutingContext.mcpPostEndpoint(sseTransportManager: SseTransportManager) { +internal suspend fun RoutingContext.mcpPostEndpoint(transportManager: TransportManager) { val sessionId: String = call.request.queryParameters["sessionId"] ?: run { call.respond(HttpStatusCode.BadRequest, "sessionId query parameter is not provided") return @@ -239,7 +241,7 @@ internal suspend fun RoutingContext.mcpPostEndpoint(sseTransportManager: SseTran logger.debug { "Received message for sessionId: $sessionId" } - val transport = sseTransportManager.getTransport(sessionId) + val transport = transportManager.getTransport(sessionId) as SseServerTransport? if (transport == null) { logger.warn { "Session not found for sessionId: $sessionId" } call.respond(HttpStatusCode.NotFound, "Session not found") diff --git a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt index af765a87..82f8be0c 100644 --- a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt +++ b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt @@ -12,6 +12,7 @@ import io.ktor.server.request.httpMethod import io.ktor.server.request.receiveText import io.ktor.server.response.header import io.ktor.server.response.respond +import io.ktor.server.response.respondBytes import io.ktor.server.response.respondNullable import io.ktor.server.sse.ServerSSESession import io.ktor.util.collections.ConcurrentMap @@ -332,7 +333,7 @@ public class StreamableHttpServerTransport( val hasRequest = messages.any { it is JSONRPCRequest } if (!hasRequest) { - call.respondNullable(status = HttpStatusCode.Accepted, message = null) + call.respondBytes(status = HttpStatusCode.Accepted, bytes = ByteArray(0)) messages.forEach { message -> _onMessage(message) } return } @@ -568,7 +569,7 @@ internal suspend fun ApplicationCall.reject(status: HttpStatusCode, code: ErrorC this.response.status(status) this.respond( JSONRPCResponse( - id = null, + id = RequestId.StringId("server-error"), error = JSONRPCError(message = message, code = code), ), ) From f62a6de703bd73771d8eef21408b54c9de483962 Mon Sep 17 00:00:00 2001 From: Xiangyu Zhang Date: Wed, 29 Oct 2025 23:26:41 +0800 Subject: [PATCH 04/15] kdoc --- .../modelcontextprotocol/kotlin/sdk/server/KtorServer.kt | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/KtorServer.kt b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/KtorServer.kt index f1920ae5..26fbb61a 100644 --- a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/KtorServer.kt +++ b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/KtorServer.kt @@ -74,6 +74,10 @@ public fun Application.mcp(block: ServerSSESession.() -> Server) { } } +/* +* Configures the Ktor Application to handle Model Context Protocol (MCP) over Streamable Http. +* It currently only works with JSON response. +*/ @KtorDsl public fun Application.mcpStreamableHttp( enableDnsRebindingProtection: Boolean = false, @@ -98,6 +102,10 @@ public fun Application.mcpStreamableHttp( } } +/* +* Configures the Ktor Application to handle Model Context Protocol (MCP) over stateless Streamable Http. +* It currently only works with JSON response. +*/ @KtorDsl public fun Application.mcpStatelessStreamableHttp( enableDnsRebindingProtection: Boolean = false, From 4d019804dc93b3c769b8d5ad15aef3bde652fabd Mon Sep 17 00:00:00 2001 From: devcrocod Date: Tue, 18 Nov 2025 13:24:16 +0100 Subject: [PATCH 05/15] Refactored code to streamline error handling and consolidate `RPCError` usage. Updated references from `ErrorCode` to `RPCError.ErrorCode`. Adjusted imports for consistency with `types` package structure. --- .../kotlin/sdk/types/jsonRpc.kt | 2 +- kotlin-sdk-server/api/kotlin-sdk-server.api | 4 +- .../kotlin/sdk/server/KtorServer.kt | 10 +-- .../server/StreamableHttpServerTransport.kt | 65 ++++++++++--------- 4 files changed, 41 insertions(+), 40 deletions(-) diff --git a/kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/types/jsonRpc.kt b/kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/types/jsonRpc.kt index 8a580eb0..cb2c76ad 100644 --- a/kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/types/jsonRpc.kt +++ b/kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/types/jsonRpc.kt @@ -197,7 +197,7 @@ public data class JSONRPCResponse(val id: RequestId, val result: RequestResult = * @property error Details about the error that occurred, including error code and message. */ @Serializable -public data class JSONRPCError(val id: RequestId, val error: RPCError) : JSONRPCMessage { +public data class JSONRPCError(val id: RequestId?, val error: RPCError) : JSONRPCMessage { @EncodeDefault override val jsonrpc: String = JSONRPC_VERSION } diff --git a/kotlin-sdk-server/api/kotlin-sdk-server.api b/kotlin-sdk-server/api/kotlin-sdk-server.api index b4626230..deaf4ca8 100644 --- a/kotlin-sdk-server/api/kotlin-sdk-server.api +++ b/kotlin-sdk-server/api/kotlin-sdk-server.api @@ -1,6 +1,6 @@ public abstract interface class io/modelcontextprotocol/kotlin/sdk/server/EventStore { public abstract fun replayEventsAfter (Ljava/lang/String;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; - public abstract fun storeEvent (Ljava/lang/String;Lio/modelcontextprotocol/kotlin/sdk/JSONRPCMessage;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public abstract fun storeEvent (Ljava/lang/String;Lio/modelcontextprotocol/kotlin/sdk/types/JSONRPCMessage;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; } public final class io/modelcontextprotocol/kotlin/sdk/server/KtorServerKt { @@ -167,7 +167,7 @@ public final class io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServe public final fun handleGetRequest (Lio/ktor/server/sse/ServerSSESession;Lio/ktor/server/application/ApplicationCall;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public final fun handlePostRequest (Lio/ktor/server/sse/ServerSSESession;Lio/ktor/server/application/ApplicationCall;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public final fun handleRequest (Lio/ktor/server/sse/ServerSSESession;Lio/ktor/server/application/ApplicationCall;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; - public fun send (Lio/modelcontextprotocol/kotlin/sdk/JSONRPCMessage;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public fun send (Lio/modelcontextprotocol/kotlin/sdk/types/JSONRPCMessage;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public final fun setOnSessionClosed (Lkotlin/jvm/functions/Function1;)V public final fun setOnSessionInitialized (Lkotlin/jvm/functions/Function1;)V public final fun setSessionIdGenerator (Lkotlin/jvm/functions/Function0;)V diff --git a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/KtorServer.kt b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/KtorServer.kt index 26fbb61a..59af472b 100644 --- a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/KtorServer.kt +++ b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/KtorServer.kt @@ -15,14 +15,14 @@ import io.ktor.server.sse.SSE import io.ktor.server.sse.ServerSSESession import io.ktor.server.sse.sse import io.ktor.utils.io.KtorDsl +import io.modelcontextprotocol.kotlin.sdk.shared.AbstractTransport +import io.modelcontextprotocol.kotlin.sdk.types.RPCError import kotlinx.atomicfu.AtomicRef import kotlinx.atomicfu.atomic import kotlinx.atomicfu.update import kotlinx.collections.immutable.PersistentMap import kotlinx.collections.immutable.toPersistentMap import kotlinx.coroutines.awaitCancellation -import io.modelcontextprotocol.kotlin.sdk.ErrorCode -import io.modelcontextprotocol.kotlin.sdk.shared.AbstractTransport private val logger = KotlinLogging.logger {} @@ -190,7 +190,7 @@ internal suspend fun RoutingContext.mcpStreamableHttpEndpoint( logger.info { "Server connection closed for sessionId: ${transport.sessionId}" } } - server.connect(transport) + server.createSession(transport) transport } else { @@ -200,7 +200,7 @@ internal suspend fun RoutingContext.mcpStreamableHttpEndpoint( if (transport == null) { this.call.reject( HttpStatusCode.BadRequest, - ErrorCode.Unknown(-32000), + RPCError.ErrorCode.CONNECTION_CLOSED, "Bad Request: No valid session ID provided", ) return @@ -234,7 +234,7 @@ internal suspend fun RoutingContext.mcpStatelessStreamableHttpEndpoint( logger.info { "Server connection closed without sessionId" } } - server.connect(transport) + server.createSession(transport) transport.handleRequest(null, this.call) diff --git a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt index 82f8be0c..8efbda89 100644 --- a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt +++ b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt @@ -16,17 +16,17 @@ import io.ktor.server.response.respondBytes import io.ktor.server.response.respondNullable import io.ktor.server.sse.ServerSSESession import io.ktor.util.collections.ConcurrentMap -import io.modelcontextprotocol.kotlin.sdk.ErrorCode -import io.modelcontextprotocol.kotlin.sdk.JSONRPCError -import io.modelcontextprotocol.kotlin.sdk.JSONRPCMessage -import io.modelcontextprotocol.kotlin.sdk.JSONRPCRequest -import io.modelcontextprotocol.kotlin.sdk.JSONRPCResponse -import io.modelcontextprotocol.kotlin.sdk.LATEST_PROTOCOL_VERSION -import io.modelcontextprotocol.kotlin.sdk.Method -import io.modelcontextprotocol.kotlin.sdk.RequestId -import io.modelcontextprotocol.kotlin.sdk.SUPPORTED_PROTOCOL_VERSIONS import io.modelcontextprotocol.kotlin.sdk.shared.AbstractTransport -import io.modelcontextprotocol.kotlin.sdk.shared.McpJson +import io.modelcontextprotocol.kotlin.sdk.types.JSONRPCError +import io.modelcontextprotocol.kotlin.sdk.types.JSONRPCMessage +import io.modelcontextprotocol.kotlin.sdk.types.JSONRPCRequest +import io.modelcontextprotocol.kotlin.sdk.types.JSONRPCResponse +import io.modelcontextprotocol.kotlin.sdk.types.LATEST_PROTOCOL_VERSION +import io.modelcontextprotocol.kotlin.sdk.types.McpJson +import io.modelcontextprotocol.kotlin.sdk.types.Method +import io.modelcontextprotocol.kotlin.sdk.types.RPCError +import io.modelcontextprotocol.kotlin.sdk.types.RequestId +import io.modelcontextprotocol.kotlin.sdk.types.SUPPORTED_PROTOCOL_VERSIONS import kotlinx.coroutines.job import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock @@ -237,7 +237,8 @@ public class StreamableHttpServerTransport( streamsMapping.values.forEach { try { it.session?.close() - } catch (_: Exception) {} + } catch (_: Exception) { + } } streamsMapping.clear() requestToResponseMapping.clear() @@ -250,7 +251,7 @@ public class StreamableHttpServerTransport( */ public suspend fun handleRequest(session: ServerSSESession?, call: ApplicationCall) { validateHeaders(call)?.let { reason -> - call.reject(HttpStatusCode.Forbidden, ErrorCode.Unknown(-32000), reason) + call.reject(HttpStatusCode.Forbidden, RPCError.ErrorCode.CONNECTION_CLOSED, reason) _onError(Error(reason)) return } @@ -264,7 +265,7 @@ public class StreamableHttpServerTransport( else -> call.run { response.header(HttpHeaders.Allow, "GET, POST, DELETE") - reject(HttpStatusCode.MethodNotAllowed, ErrorCode.Unknown(-32000), "Method not allowed.") + reject(HttpStatusCode.MethodNotAllowed, RPCError.ErrorCode.CONNECTION_CLOSED, "Method not allowed.") } } } @@ -283,7 +284,7 @@ public class StreamableHttpServerTransport( if (!isAcceptEventStream || !isAcceptJson) { call.reject( HttpStatusCode.NotAcceptable, - ErrorCode.Unknown(-32000), + RPCError.ErrorCode.CONNECTION_CLOSED, "Not Acceptable: Client must accept both application/json and text/event-stream", ) return @@ -292,7 +293,7 @@ public class StreamableHttpServerTransport( if (!call.request.contentType().match(ContentType.Application.Json)) { call.reject( HttpStatusCode.UnsupportedMediaType, - ErrorCode.Unknown(-32000), + RPCError.ErrorCode.CONNECTION_CLOSED, "Unsupported Media Type: Content-Type must be application/json", ) return @@ -307,7 +308,7 @@ public class StreamableHttpServerTransport( if (initialized.load() && sessionId != null) { call.reject( HttpStatusCode.BadRequest, - ErrorCode.Defined.InvalidRequest, + RPCError.ErrorCode.INVALID_REQUEST, "Invalid Request: Server already initialized", ) return @@ -315,7 +316,7 @@ public class StreamableHttpServerTransport( if (messages.size > 1) { call.reject( HttpStatusCode.BadRequest, - ErrorCode.Defined.InvalidRequest, + RPCError.ErrorCode.INVALID_REQUEST, "Invalid Request: Only one initialization request is allowed", ) return @@ -354,7 +355,7 @@ public class StreamableHttpServerTransport( } catch (e: Exception) { call.reject( HttpStatusCode.BadRequest, - ErrorCode.Defined.ParseError, + RPCError.ErrorCode.PARSE_ERROR, "Parse error: ${e.message}", ) _onError(e) @@ -365,7 +366,7 @@ public class StreamableHttpServerTransport( if (enableJsonResponse) { call.reject( HttpStatusCode.MethodNotAllowed, - ErrorCode.Unknown(-32000), + RPCError.ErrorCode.CONNECTION_CLOSED, "Method not allowed.", ) return @@ -376,7 +377,7 @@ public class StreamableHttpServerTransport( if (!acceptHeader.accepts(ContentType.Text.EventStream)) { call.reject( HttpStatusCode.NotAcceptable, - ErrorCode.Unknown(-32000), + RPCError.ErrorCode.CONNECTION_CLOSED, "Not Acceptable: Client must accept text/event-stream", ) return @@ -394,7 +395,7 @@ public class StreamableHttpServerTransport( if (STANDALONE_SSE_STREAM_ID in streamsMapping) { call.reject( HttpStatusCode.Conflict, - ErrorCode.Unknown(-32000), + RPCError.ErrorCode.CONNECTION_CLOSED, "Conflict: Only one SSE stream is allowed per session", ) return @@ -410,7 +411,7 @@ public class StreamableHttpServerTransport( if (enableJsonResponse) { call.reject( HttpStatusCode.MethodNotAllowed, - ErrorCode.Unknown(-32000), + RPCError.ErrorCode.CONNECTION_CLOSED, "Method not allowed.", ) } @@ -449,7 +450,7 @@ public class StreamableHttpServerTransport( if (!initialized.load()) { call.reject( HttpStatusCode.BadRequest, - ErrorCode.Unknown(-32000), + RPCError.ErrorCode.CONNECTION_CLOSED, "Bad Request: Server not initialized", ) return false @@ -461,7 +462,7 @@ public class StreamableHttpServerTransport( headerId == null -> { call.reject( HttpStatusCode.BadRequest, - ErrorCode.Unknown(-32000), + RPCError.ErrorCode.CONNECTION_CLOSED, "Bad Request: Mcp-Session-Id header is required", ) false @@ -470,7 +471,7 @@ public class StreamableHttpServerTransport( headerId != sessionId -> { call.reject( HttpStatusCode.NotFound, - ErrorCode.Unknown(-32001), + -32001, "Session not found", ) false @@ -487,7 +488,7 @@ public class StreamableHttpServerTransport( !in SUPPORTED_PROTOCOL_VERSIONS -> { call.reject( HttpStatusCode.BadRequest, - ErrorCode.Unknown(-32000), + RPCError.ErrorCode.CONNECTION_CLOSED, "Bad Request: Unsupported protocol version (supported versions: ${ SUPPORTED_PROTOCOL_VERSIONS.joinToString( ", ", @@ -531,10 +532,10 @@ public class StreamableHttpServerTransport( else -> { call.reject( HttpStatusCode.BadRequest, - ErrorCode.Defined.InvalidRequest, + RPCError.ErrorCode.INVALID_REQUEST, "Invalid Request: unable to parse JSON body", ) - return null + null } } } @@ -565,12 +566,12 @@ public class StreamableHttpServerTransport( } } -internal suspend fun ApplicationCall.reject(status: HttpStatusCode, code: ErrorCode, message: String) { +internal suspend fun ApplicationCall.reject(status: HttpStatusCode, code: Int, message: String) { this.response.status(status) this.respond( - JSONRPCResponse( - id = RequestId.StringId("server-error"), - error = JSONRPCError(message = message, code = code), + JSONRPCError( + id = null, + error = RPCError(code = code, message = message), ), ) } From 66f88a1351e47b51780f848ea2384772a7bad5ec Mon Sep 17 00:00:00 2001 From: devcrocod Date: Wed, 19 Nov 2025 18:21:49 +0100 Subject: [PATCH 06/15] Updated `Transport.send` method signature across all implementations to include `TransportSendOptions`. Added support for optional resumption tokens and progress callbacks in request handling. --- kotlin-sdk-core/api/kotlin-sdk-core.api | 1 + kotlin-sdk-server/api/kotlin-sdk-server.api | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/kotlin-sdk-core/api/kotlin-sdk-core.api b/kotlin-sdk-core/api/kotlin-sdk-core.api index 0158259a..6620ba4a 100644 --- a/kotlin-sdk-core/api/kotlin-sdk-core.api +++ b/kotlin-sdk-core/api/kotlin-sdk-core.api @@ -995,6 +995,7 @@ public final class io/modelcontextprotocol/kotlin/sdk/types/ClientResult$Default } public final class io/modelcontextprotocol/kotlin/sdk/types/CommonKt { + public static final field DEFAULT_NEGOTIATED_PROTOCOL_VERSION Ljava/lang/String; public static final field LATEST_PROTOCOL_VERSION Ljava/lang/String; public static final fun ProgressToken (J)Lio/modelcontextprotocol/kotlin/sdk/types/RequestId; public static final fun ProgressToken (Ljava/lang/String;)Lio/modelcontextprotocol/kotlin/sdk/types/RequestId; diff --git a/kotlin-sdk-server/api/kotlin-sdk-server.api b/kotlin-sdk-server/api/kotlin-sdk-server.api index deaf4ca8..93b590a6 100644 --- a/kotlin-sdk-server/api/kotlin-sdk-server.api +++ b/kotlin-sdk-server/api/kotlin-sdk-server.api @@ -163,11 +163,11 @@ public final class io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServe public synthetic fun (ZZLjava/util/List;Ljava/util/List;Lio/modelcontextprotocol/kotlin/sdk/server/EventStore;ILkotlin/jvm/internal/DefaultConstructorMarker;)V public fun close (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public final fun getSessionId ()Ljava/lang/String; - public final fun handleDeleteRequest (Lio/ktor/server/sse/ServerSSESession;Lio/ktor/server/application/ApplicationCall;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public final fun handleDeleteRequest (Lio/ktor/server/application/ApplicationCall;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public final fun handleGetRequest (Lio/ktor/server/sse/ServerSSESession;Lio/ktor/server/application/ApplicationCall;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public final fun handlePostRequest (Lio/ktor/server/sse/ServerSSESession;Lio/ktor/server/application/ApplicationCall;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public final fun handleRequest (Lio/ktor/server/sse/ServerSSESession;Lio/ktor/server/application/ApplicationCall;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; - public fun send (Lio/modelcontextprotocol/kotlin/sdk/types/JSONRPCMessage;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public fun send (Lio/modelcontextprotocol/kotlin/sdk/types/JSONRPCMessage;Lio/modelcontextprotocol/kotlin/sdk/shared/TransportSendOptions;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public final fun setOnSessionClosed (Lkotlin/jvm/functions/Function1;)V public final fun setOnSessionInitialized (Lkotlin/jvm/functions/Function1;)V public final fun setSessionIdGenerator (Lkotlin/jvm/functions/Function0;)V From db8894acb9acb2bf397cc0fa48cab63fcdcd4124 Mon Sep 17 00:00:00 2001 From: devcrocod Date: Wed, 19 Nov 2025 18:40:57 +0100 Subject: [PATCH 07/15] Refactored `StreamableHttpServerTransport` to improve nullable safety, simplify `session` handling, and replace `LATEST_PROTOCOL_VERSION` with `DEFAULT_NEGOTIATED_PROTOCOL_VERSION`. Adjusted request validation and related imports. --- .../kotlin/sdk/types/common.kt | 4 +- .../server/StreamableHttpServerTransport.kt | 44 +++++++------------ 2 files changed, 18 insertions(+), 30 deletions(-) diff --git a/kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/types/common.kt b/kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/types/common.kt index f715be73..e22dab7e 100644 --- a/kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/types/common.kt +++ b/kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/types/common.kt @@ -1,7 +1,5 @@ package io.modelcontextprotocol.kotlin.sdk.types -import io.modelcontextprotocol.kotlin.sdk.types.Icon.Theme.Dark -import io.modelcontextprotocol.kotlin.sdk.types.Icon.Theme.Light import kotlinx.serialization.SerialName import kotlinx.serialization.Serializable import kotlinx.serialization.json.JsonObject @@ -12,6 +10,8 @@ import kotlinx.serialization.json.JsonObject public const val LATEST_PROTOCOL_VERSION: String = "2025-06-18" +public const val DEFAULT_NEGOTIATED_PROTOCOL_VERSION: String = "2025-03-26" + public val SUPPORTED_PROTOCOL_VERSIONS: List = listOf( LATEST_PROTOCOL_VERSION, "2025-03-26", diff --git a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt index 8efbda89..ec3373d7 100644 --- a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt +++ b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt @@ -12,16 +12,16 @@ import io.ktor.server.request.httpMethod import io.ktor.server.request.receiveText import io.ktor.server.response.header import io.ktor.server.response.respond -import io.ktor.server.response.respondBytes import io.ktor.server.response.respondNullable import io.ktor.server.sse.ServerSSESession import io.ktor.util.collections.ConcurrentMap import io.modelcontextprotocol.kotlin.sdk.shared.AbstractTransport +import io.modelcontextprotocol.kotlin.sdk.shared.TransportSendOptions +import io.modelcontextprotocol.kotlin.sdk.types.DEFAULT_NEGOTIATED_PROTOCOL_VERSION import io.modelcontextprotocol.kotlin.sdk.types.JSONRPCError import io.modelcontextprotocol.kotlin.sdk.types.JSONRPCMessage import io.modelcontextprotocol.kotlin.sdk.types.JSONRPCRequest import io.modelcontextprotocol.kotlin.sdk.types.JSONRPCResponse -import io.modelcontextprotocol.kotlin.sdk.types.LATEST_PROTOCOL_VERSION import io.modelcontextprotocol.kotlin.sdk.types.McpJson import io.modelcontextprotocol.kotlin.sdk.types.Method import io.modelcontextprotocol.kotlin.sdk.types.RPCError @@ -169,7 +169,7 @@ public class StreamableHttpServerTransport( } } - override suspend fun send(message: JSONRPCMessage) { + override suspend fun send(message: JSONRPCMessage, options: TransportSendOptions?) { val requestId: RequestId? = when (message) { is JSONRPCResponse -> message.id is JSONRPCError -> message.id @@ -182,17 +182,16 @@ public class StreamableHttpServerTransport( "Cannot send a response on a standalone SSE stream unless resuming a previous client request" } val standaloneStream = streamsMapping[STANDALONE_SSE_STREAM_ID] ?: return - emitOnStream(STANDALONE_SSE_STREAM_ID, standaloneStream.session!!, message) + emitOnStream(STANDALONE_SSE_STREAM_ID, standaloneStream.session, message) return } - val streamId = requestToStreamMapping[requestId] - ?: error("No connection established for request ID: $requestId") + val streamId = requestToStreamMapping[requestId] ?: error("No connection established for request id $requestId") val activeStream = streamsMapping[streamId] if (!enableJsonResponse) { activeStream?.let { stream -> - emitOnStream(streamId, stream.session!!, message) + emitOnStream(streamId, stream.session, message) } } @@ -202,8 +201,7 @@ public class StreamableHttpServerTransport( requestToResponseMapping[requestId] = message val relatedIds = requestToStreamMapping.filterValues { it == streamId }.keys - val allResponseReady = relatedIds.all { it in requestToResponseMapping } - if (!allResponseReady) return + if (relatedIds.any { it !in requestToResponseMapping }) return streamMutex.withLock { if (activeStream == null) error("No connection established for request ID: $requestId") @@ -211,9 +209,7 @@ public class StreamableHttpServerTransport( if (enableJsonResponse) { activeStream.call.response.header(HttpHeaders.ContentType, ContentType.Application.Json.toString()) sessionId?.let { activeStream.call.response.header(MCP_SESSION_ID_HEADER, it) } - val responses = relatedIds - .mapNotNull { requestToResponseMapping[it] } - .map { McpJson.encodeToString(it) } + val responses = relatedIds.mapNotNull { requestToResponseMapping[it] } val payload = if (responses.size == 1) { responses.first() } else { @@ -221,7 +217,7 @@ public class StreamableHttpServerTransport( } activeStream.call.respond(payload) } else { - activeStream.session!!.close() + activeStream.session?.close() } // Clean up @@ -261,7 +257,7 @@ public class StreamableHttpServerTransport( HttpMethod.Get -> handleGetRequest(session, call) - HttpMethod.Delete -> handleDeleteRequest(session, call) + HttpMethod.Delete -> handleDeleteRequest(call) else -> call.run { response.header(HttpHeaders.Allow, "GET, POST, DELETE") @@ -334,7 +330,7 @@ public class StreamableHttpServerTransport( val hasRequest = messages.any { it is JSONRPCRequest } if (!hasRequest) { - call.respondBytes(status = HttpStatusCode.Accepted, bytes = ByteArray(0)) + call.respondNullable(status = HttpStatusCode.Accepted, message = null) messages.forEach { message -> _onMessage(message) } return } @@ -342,7 +338,7 @@ public class StreamableHttpServerTransport( val streamId = Uuid.random().toString() if (!enableJsonResponse) { call.appendSseHeaders() - session!!.send(data = "") // flush headers immediately + session?.send(data = "") // flush headers immediately } streamMutex.withLock { @@ -407,15 +403,7 @@ public class StreamableHttpServerTransport( session.coroutineContext.job.invokeOnCompletion { streamsMapping.remove(STANDALONE_SSE_STREAM_ID) } } - public suspend fun handleDeleteRequest(session: ServerSSESession?, call: ApplicationCall) { - if (enableJsonResponse) { - call.reject( - HttpStatusCode.MethodNotAllowed, - RPCError.ErrorCode.CONNECTION_CLOSED, - "Method not allowed.", - ) - } - + public suspend fun handleDeleteRequest(call: ApplicationCall) { if (!validateSession(call) || !validateProtocolVersion(call)) return sessionId?.let { onSessionClosed?.invoke(it) } close() @@ -482,7 +470,7 @@ public class StreamableHttpServerTransport( } private suspend fun validateProtocolVersion(call: ApplicationCall): Boolean { - val version = call.request.header(MCP_PROTOCOL_VERSION_HEADER) ?: LATEST_PROTOCOL_VERSION + val version = call.request.header(MCP_PROTOCOL_VERSION_HEADER) ?: DEFAULT_NEGOTIATED_PROTOCOL_VERSION return when (version) { !in SUPPORTED_PROTOCOL_VERSIONS -> { @@ -548,10 +536,10 @@ public class StreamableHttpServerTransport( return pattern.containsMatchIn(this) } - private suspend fun emitOnStream(streamId: String, session: ServerSSESession, message: JSONRPCMessage) { + private suspend fun emitOnStream(streamId: String, session: ServerSSESession?, message: JSONRPCMessage) { val eventId = eventStore?.storeEvent(streamId, message) try { - session.send(event = "message", id = eventId, data = McpJson.encodeToString(message)) + session?.send(event = "message", id = eventId, data = McpJson.encodeToString(message)) } catch (_: Exception) { streamsMapping.remove(streamId) } From 68da84251b3e05931faddd0276372b3b33e41c41 Mon Sep 17 00:00:00 2001 From: devcrocod Date: Wed, 19 Nov 2025 19:19:51 +0100 Subject: [PATCH 08/15] Enforced message size limits, improved error handling in event replay and request validation, and optimized DNS rebind protection logic. Added safeguards for headers and session cleanups. --- .../server/StreamableHttpServerTransport.kt | 50 ++++++++++++++----- 1 file changed, 38 insertions(+), 12 deletions(-) diff --git a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt index ec3373d7..501b3955 100644 --- a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt +++ b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt @@ -7,7 +7,6 @@ import io.ktor.http.HttpStatusCode import io.ktor.server.application.ApplicationCall import io.ktor.server.request.contentType import io.ktor.server.request.header -import io.ktor.server.request.host import io.ktor.server.request.httpMethod import io.ktor.server.request.receiveText import io.ktor.server.response.header @@ -41,6 +40,7 @@ import kotlin.uuid.Uuid internal const val MCP_SESSION_ID_HEADER = "mcp-session-id" private const val MCP_PROTOCOL_VERSION_HEADER = "mcp-protocol-version" private const val MCP_RESUMPTION_TOKEN_HEADER = "Last-Event-ID" +private const val MAXIMUM_MESSAGE_SIZE = 4 * 1024 * 1024 // 4 MB /** * Interface for resumability support via event storage @@ -415,6 +415,8 @@ public class StreamableHttpServerTransport( try { call.appendSseHeaders() + session.send(data = "") // flush headers immediately + val streamId = store.replayEventsAfter(lastEventId) { eventId, message -> try { session.send( @@ -423,10 +425,16 @@ public class StreamableHttpServerTransport( data = McpJson.encodeToString(message), ) } catch (e: Exception) { - _onError(e) + _onError(IllegalStateException("Failed to replay event: ${e.message}", e)) } } + streamsMapping[streamId] = SessionContext(session, call) + + session.coroutineContext.job.invokeOnCompletion { throwable -> + streamsMapping.remove(streamId) + throwable?.let { _onError(it) } + } } catch (e: Exception) { _onError(e) } @@ -494,15 +502,19 @@ public class StreamableHttpServerTransport( if (!enableDnsRebindingProtection) return null allowedHosts?.let { hosts -> - val hostHeader = call.request.host().substringBefore(':').lowercase() - if (hostHeader !in hosts.map { it.substringBefore(':').lowercase() }) { + val hostHeader = call.request.headers[HttpHeaders.Host]?.lowercase() + val allowedHostsLowercase = hosts.map { it.lowercase() } + + if (hostHeader == null || hostHeader !in allowedHostsLowercase) { return "Invalid Host header: $hostHeader" } } allowedOrigins?.let { origins -> - val originHeader = call.request.headers[HttpHeaders.Origin]?.removeSuffix("/")?.lowercase() - if (originHeader !in origins.map { it.removeSuffix("/").lowercase() }) { + val originHeader = call.request.headers[HttpHeaders.Origin]?.lowercase() + val allowedOriginsLowercase = origins.map { it.lowercase() } + + if (originHeader == null || originHeader !in allowedOriginsLowercase) { return "Invalid Origin header: $originHeader" } } @@ -511,7 +523,26 @@ public class StreamableHttpServerTransport( } private suspend fun parseBody(call: ApplicationCall): List? { + val contentLength = call.request.header(HttpHeaders.ContentLength)?.toIntOrNull() ?: 0 + if (contentLength > MAXIMUM_MESSAGE_SIZE) { + call.reject( + HttpStatusCode.PayloadTooLarge, + RPCError.ErrorCode.INVALID_REQUEST, + "Invalid Request: message size exceeds maximum of ${MAXIMUM_MESSAGE_SIZE / (1024 * 1024)} MB", + ) + return null + } + val body = call.receiveText() + if (body.length > MAXIMUM_MESSAGE_SIZE) { + call.reject( + HttpStatusCode.PayloadTooLarge, + RPCError.ErrorCode.INVALID_REQUEST, + "Invalid Request: message size exceeds maximum of ${MAXIMUM_MESSAGE_SIZE / (1024 * 1024)} MB", + ) + return null + } + return when (val element = McpJson.parseToJsonElement(body)) { is JsonObject -> listOf(McpJson.decodeFromJsonElement(element)) @@ -556,10 +587,5 @@ public class StreamableHttpServerTransport( internal suspend fun ApplicationCall.reject(status: HttpStatusCode, code: Int, message: String) { this.response.status(status) - this.respond( - JSONRPCError( - id = null, - error = RPCError(code = code, message = message), - ), - ) + this.respond(JSONRPCError(id = null, error = RPCError(code = code, message = message))) } From 13ea9acdd7ab2c58a0ee3a9023b314edb0bbcdd1 Mon Sep 17 00:00:00 2001 From: devcrocod Date: Mon, 1 Dec 2025 18:19:23 +0100 Subject: [PATCH 09/15] Process routingRequestId --- .../sdk/server/StreamableHttpServerTransport.kt | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt index 501b3955..6018a56d 100644 --- a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt +++ b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt @@ -170,14 +170,15 @@ public class StreamableHttpServerTransport( } override suspend fun send(message: JSONRPCMessage, options: TransportSendOptions?) { - val requestId: RequestId? = when (message) { + val responseRequestId: RequestId? = when (message) { is JSONRPCResponse -> message.id is JSONRPCError -> message.id else -> null } + val routingRequestId = responseRequestId ?: options?.relatedRequestId // Standalone SSE stream - if (requestId == null) { + if (routingRequestId == null) { require(message !is JSONRPCResponse && message !is JSONRPCError) { "Cannot send a response on a standalone SSE stream unless resuming a previous client request" } @@ -186,7 +187,8 @@ public class StreamableHttpServerTransport( return } - val streamId = requestToStreamMapping[requestId] ?: error("No connection established for request id $requestId") + val streamId = requestToStreamMapping[routingRequestId] + ?: error("No connection established for request id $routingRequestId") val activeStream = streamsMapping[streamId] if (!enableJsonResponse) { @@ -198,13 +200,13 @@ public class StreamableHttpServerTransport( val isTerminated = message is JSONRPCResponse || message is JSONRPCError if (!isTerminated) return - requestToResponseMapping[requestId] = message + requestToResponseMapping[responseRequestId!!] = message val relatedIds = requestToStreamMapping.filterValues { it == streamId }.keys if (relatedIds.any { it !in requestToResponseMapping }) return streamMutex.withLock { - if (activeStream == null) error("No connection established for request ID: $requestId") + if (activeStream == null) error("No connection established for request ID: $routingRequestId") if (enableJsonResponse) { activeStream.call.response.header(HttpHeaders.ContentType, ContentType.Application.Json.toString()) From 24905f02011c622ca168f050ca028216cffab935 Mon Sep 17 00:00:00 2001 From: devcrocod Date: Mon, 1 Dec 2025 18:40:43 +0100 Subject: [PATCH 10/15] Added `getStreamIdForEventId` to `EventStore` and updated `StreamableHttpServerTransport` for stream ID validation during event replay. --- kotlin-sdk-server/api/kotlin-sdk-server.api | 1 + .../server/StreamableHttpServerTransport.kt | 38 +++++++++++++++++++ 2 files changed, 39 insertions(+) diff --git a/kotlin-sdk-server/api/kotlin-sdk-server.api b/kotlin-sdk-server/api/kotlin-sdk-server.api index 93b590a6..b6919845 100644 --- a/kotlin-sdk-server/api/kotlin-sdk-server.api +++ b/kotlin-sdk-server/api/kotlin-sdk-server.api @@ -1,4 +1,5 @@ public abstract interface class io/modelcontextprotocol/kotlin/sdk/server/EventStore { + public abstract fun getStreamIdForEventId (Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public abstract fun replayEventsAfter (Ljava/lang/String;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public abstract fun storeEvent (Ljava/lang/String;Lio/modelcontextprotocol/kotlin/sdk/types/JSONRPCMessage;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; } diff --git a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt index 6018a56d..8c698262 100644 --- a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt +++ b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt @@ -64,6 +64,12 @@ public interface EventStore { lastEventId: String, sender: suspend (eventId: String, message: JSONRPCMessage) -> Unit, ): String + + /** + * Returns the stream ID associated with [eventId], or null if the event is unknown. + * Default implementation is a no-op which disables extra validation during replay. + */ + public suspend fun getStreamIdForEventId(eventId: String): String? } /** @@ -416,6 +422,38 @@ public class StreamableHttpServerTransport( val call: ApplicationCall = session.call try { + var lookupSupported = true + val lookupStreamId = try { + store.getStreamIdForEventId(lastEventId) + } catch (_: NotImplementedError) { + lookupSupported = false + null + } catch (_: UnsupportedOperationException) { + lookupSupported = false + null + } + + if (lookupSupported) { + val streamId = lookupStreamId + ?: run { + call.reject( + HttpStatusCode.BadRequest, + RPCError.ErrorCode.CONNECTION_CLOSED, + "Invalid event ID format", + ) + return + } + + if (streamId in streamsMapping) { + call.reject( + HttpStatusCode.Conflict, + RPCError.ErrorCode.CONNECTION_CLOSED, + "Conflict: Stream already has an active connection", + ) + return + } + } + call.appendSseHeaders() session.send(data = "") // flush headers immediately From 99800c4194f6fb6b78b3fcf8a949cb9e4c778fd1 Mon Sep 17 00:00:00 2001 From: devcrocod Date: Mon, 1 Dec 2025 18:44:48 +0100 Subject: [PATCH 11/15] api dump --- kotlin-sdk-core/api/kotlin-sdk-core.api | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/kotlin-sdk-core/api/kotlin-sdk-core.api b/kotlin-sdk-core/api/kotlin-sdk-core.api index 6620ba4a..79dd9a04 100644 --- a/kotlin-sdk-core/api/kotlin-sdk-core.api +++ b/kotlin-sdk-core/api/kotlin-sdk-core.api @@ -454,19 +454,12 @@ public final class io/modelcontextprotocol/kotlin/sdk/shared/ProtocolKt { } public class io/modelcontextprotocol/kotlin/sdk/shared/ProtocolOptions { - public fun ()V - public fun (ZLjava/util/List;)V - public synthetic fun (ZLjava/util/List;ILkotlin/jvm/internal/DefaultConstructorMarker;)V - public final fun component1 ()Z - public final fun component2 ()Ljava/util/List; - public fun copy (ZLjava/util/List;)Lio/modelcontextprotocol/kotlin/sdk/shared/ProtocolOptions; - public static synthetic fun copy$default (Lio/modelcontextprotocol/kotlin/sdk/shared/ProtocolOptions;ZLjava/util/List;ILjava/lang/Object;)Lio/modelcontextprotocol/kotlin/sdk/shared/ProtocolOptions; - public fun equals (Ljava/lang/Object;)Z - public final fun getDebouncedNotificationMethods ()Ljava/util/List; + public synthetic fun (ZJILkotlin/jvm/internal/DefaultConstructorMarker;)V + public synthetic fun (ZJLkotlin/jvm/internal/DefaultConstructorMarker;)V public final fun getEnforceStrictCapabilities ()Z - public fun hashCode ()I + public final fun getTimeout-UwyO8pc ()J public final fun setEnforceStrictCapabilities (Z)V - public fun toString ()Ljava/lang/String; + public final fun setTimeout-LRDsOJo (J)V } public final class io/modelcontextprotocol/kotlin/sdk/shared/ReadBuffer { From bacf6e82c71cdb6e27ee80e90609d18c6d4a0068 Mon Sep 17 00:00:00 2001 From: devcrocod Date: Mon, 1 Dec 2025 19:02:19 +0100 Subject: [PATCH 12/15] Added `retryIntervalMillis` to `StreamableHttpServerTransport` for SSE reconnection hints and refactored SSE flush logic into `flushSse` function. --- .../sdk/server/StreamableHttpServerTransport.kt | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt index 8c698262..2f50f80c 100644 --- a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt +++ b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt @@ -105,6 +105,8 @@ private data class SessionContext(val session: ServerSSESession?, val call: Appl * If not specified, origin validation is disabled. * @param eventStore Event store for resumability support * If provided, resumability will be enabled, allowing clients to reconnect and resume messages + * @param retryIntervalMillis Retry interval (in milliseconds) advertised via SSE priming events to hint the client when to reconnect. + * Applies only when an [eventStore] is configured. Defaults to `null` (no retry hint). */ @OptIn(ExperimentalUuidApi::class, ExperimentalAtomicApi::class) public class StreamableHttpServerTransport( @@ -113,6 +115,7 @@ public class StreamableHttpServerTransport( private val allowedHosts: List? = null, private val allowedOrigins: List? = null, private val eventStore: EventStore? = null, + private val retryIntervalMillis: Long? = null, ) : AbstractTransport() { public var sessionId: String? = null private set @@ -346,7 +349,7 @@ public class StreamableHttpServerTransport( val streamId = Uuid.random().toString() if (!enableJsonResponse) { call.appendSseHeaders() - session?.send(data = "") // flush headers immediately + flushSse(session) // flush headers immediately } streamMutex.withLock { @@ -406,7 +409,7 @@ public class StreamableHttpServerTransport( } call.appendSseHeaders() - session.send(data = "") // flush headers immediately + flushSse(session) // flush headers immediately streamsMapping[STANDALONE_SSE_STREAM_ID] = SessionContext(session, call) session.coroutineContext.job.invokeOnCompletion { streamsMapping.remove(STANDALONE_SSE_STREAM_ID) } } @@ -455,7 +458,7 @@ public class StreamableHttpServerTransport( } call.appendSseHeaders() - session.send(data = "") // flush headers immediately + flushSse(session) // flush headers immediately val streamId = store.replayEventsAfter(lastEventId) { eventId, message -> try { @@ -562,6 +565,14 @@ public class StreamableHttpServerTransport( return null } + private suspend fun flushSse(session: ServerSSESession?) { + try { + session?.send(data = "") + } catch (e: Exception) { + _onError(e) + } + } + private suspend fun parseBody(call: ApplicationCall): List? { val contentLength = call.request.header(HttpHeaders.ContentLength)?.toIntOrNull() ?: 0 if (contentLength > MAXIMUM_MESSAGE_SIZE) { From eda4337937ca54788fbcd0e6a857d134e3157b10 Mon Sep 17 00:00:00 2001 From: devcrocod Date: Mon, 1 Dec 2025 19:07:18 +0100 Subject: [PATCH 13/15] Added `closeSseStream` method to `StreamableHttpServerTransport` for managing SSE stream lifecycle and updated API signature. --- kotlin-sdk-server/api/kotlin-sdk-server.api | 6 +++--- .../server/StreamableHttpServerTransport.kt | 18 ++++++++++++++++++ 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/kotlin-sdk-server/api/kotlin-sdk-server.api b/kotlin-sdk-server/api/kotlin-sdk-server.api index b6919845..f064546f 100644 --- a/kotlin-sdk-server/api/kotlin-sdk-server.api +++ b/kotlin-sdk-server/api/kotlin-sdk-server.api @@ -160,8 +160,9 @@ public final class io/modelcontextprotocol/kotlin/sdk/server/StdioServerTranspor public final class io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport : io/modelcontextprotocol/kotlin/sdk/shared/AbstractTransport { public static final field STANDALONE_SSE_STREAM_ID Ljava/lang/String; public fun ()V - public fun (ZZLjava/util/List;Ljava/util/List;Lio/modelcontextprotocol/kotlin/sdk/server/EventStore;)V - public synthetic fun (ZZLjava/util/List;Ljava/util/List;Lio/modelcontextprotocol/kotlin/sdk/server/EventStore;ILkotlin/jvm/internal/DefaultConstructorMarker;)V + public fun (ZZLjava/util/List;Ljava/util/List;Lio/modelcontextprotocol/kotlin/sdk/server/EventStore;Ljava/lang/Long;)V + public synthetic fun (ZZLjava/util/List;Ljava/util/List;Lio/modelcontextprotocol/kotlin/sdk/server/EventStore;Ljava/lang/Long;ILkotlin/jvm/internal/DefaultConstructorMarker;)V + public fun closeSseStream (Lio/modelcontextprotocol/kotlin/sdk/types/RequestId;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public fun close (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public final fun getSessionId ()Ljava/lang/String; public final fun handleDeleteRequest (Lio/ktor/server/application/ApplicationCall;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; @@ -195,4 +196,3 @@ public final class io/modelcontextprotocol/kotlin/sdk/server/WebSocketMcpServerT public fun (Lio/ktor/server/websocket/WebSocketServerSession;)V public synthetic fun getSession ()Lio/ktor/websocket/WebSocketSession; } - diff --git a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt index 2f50f80c..aa0b38c1 100644 --- a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt +++ b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt @@ -239,6 +239,24 @@ public class StreamableHttpServerTransport( } } + /** + * Closes the SSE stream associated with the given [requestId], prompting the client to reconnect. + * Useful for implementing polling behavior for long-running operations. + */ + public suspend fun closeSseStream(requestId: RequestId) { + if (enableJsonResponse) return + val streamId = requestToStreamMapping[requestId] ?: return + val sessionContext = streamsMapping[streamId] ?: return + + try { + sessionContext.session?.close() + } catch (e: Exception) { + _onError(e) + } finally { + streamsMapping.remove(streamId) + } + } + override suspend fun close() { streamMutex.withLock { streamsMapping.values.forEach { From 613d3fbac1edb3e6d9fff34057446a5251ad34bb Mon Sep 17 00:00:00 2001 From: devcrocod Date: Mon, 1 Dec 2025 20:01:57 +0100 Subject: [PATCH 14/15] Added `PrimingEventMessage` to support SSE stream initialization and integrated it with `StreamableHttpServerTransport`. Updated serializers and API signatures accordingly. --- kotlin-sdk-core/api/kotlin-sdk-core.api | 9 ++++ .../kotlin/sdk/shared/Protocol.kt | 2 + .../kotlin/sdk/types/jsonRpc.kt | 6 +++ .../kotlin/sdk/types/serializers.kt | 1 + kotlin-sdk-server/api/kotlin-sdk-server.api | 3 +- .../server/StreamableHttpServerTransport.kt | 50 ++++++++++++------- 6 files changed, 52 insertions(+), 19 deletions(-) diff --git a/kotlin-sdk-core/api/kotlin-sdk-core.api b/kotlin-sdk-core/api/kotlin-sdk-core.api index 79dd9a04..0919bc07 100644 --- a/kotlin-sdk-core/api/kotlin-sdk-core.api +++ b/kotlin-sdk-core/api/kotlin-sdk-core.api @@ -2947,6 +2947,15 @@ public final class io/modelcontextprotocol/kotlin/sdk/types/PingRequestBuilder : public synthetic fun build$kotlin_sdk_core ()Lio/modelcontextprotocol/kotlin/sdk/types/Request; } +public final class io/modelcontextprotocol/kotlin/sdk/types/PrimingEventMessage : io/modelcontextprotocol/kotlin/sdk/types/JSONRPCMessage { + public static final field INSTANCE Lio/modelcontextprotocol/kotlin/sdk/types/PrimingEventMessage; + public fun equals (Ljava/lang/Object;)Z + public fun getJsonrpc ()Ljava/lang/String; + public fun hashCode ()I + public final fun serializer ()Lkotlinx/serialization/KSerializer; + public fun toString ()Ljava/lang/String; +} + public final class io/modelcontextprotocol/kotlin/sdk/types/Progress { public static final field Companion Lio/modelcontextprotocol/kotlin/sdk/types/Progress$Companion; public fun (DLjava/lang/Double;Ljava/lang/String;)V diff --git a/kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/Protocol.kt b/kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/Protocol.kt index 7dd04bc0..4e2ea29a 100644 --- a/kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/Protocol.kt +++ b/kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/Protocol.kt @@ -13,6 +13,7 @@ import io.modelcontextprotocol.kotlin.sdk.types.McpJson import io.modelcontextprotocol.kotlin.sdk.types.Method import io.modelcontextprotocol.kotlin.sdk.types.Notification import io.modelcontextprotocol.kotlin.sdk.types.PingRequest +import io.modelcontextprotocol.kotlin.sdk.types.PrimingEventMessage import io.modelcontextprotocol.kotlin.sdk.types.Progress import io.modelcontextprotocol.kotlin.sdk.types.ProgressNotification import io.modelcontextprotocol.kotlin.sdk.types.ProgressToken @@ -249,6 +250,7 @@ public abstract class Protocol(@PublishedApi internal val options: ProtocolOptio is JSONRPCRequest -> onRequest(message) is JSONRPCNotification -> onNotification(message) is JSONRPCError -> onResponse(null, message) + is PrimingEventMessage -> Unit } } diff --git a/kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/types/jsonRpc.kt b/kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/types/jsonRpc.kt index cb2c76ad..30bd815f 100644 --- a/kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/types/jsonRpc.kt +++ b/kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/types/jsonRpc.kt @@ -85,6 +85,12 @@ public sealed interface JSONRPCMessage { public val jsonrpc: String } +@Serializable +public data object PrimingEventMessage : JSONRPCMessage { + @EncodeDefault + override val jsonrpc: String = JSONRPC_VERSION +} + // ============================================================================ // JSONRPCRequest // ============================================================================ diff --git a/kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/types/serializers.kt b/kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/types/serializers.kt index 721a2aac..f3000105 100644 --- a/kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/types/serializers.kt +++ b/kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/types/serializers.kt @@ -384,6 +384,7 @@ internal object JSONRPCMessagePolymorphicSerializer : "result" in jsonObject -> JSONRPCResponse.serializer() "method" in jsonObject && "id" in jsonObject -> JSONRPCRequest.serializer() "method" in jsonObject -> JSONRPCNotification.serializer() + jsonObject.isEmpty() || jsonObject.keys == setOf("jsonrpc") -> PrimingEventMessage.serializer() else -> throw SerializationException("Invalid JSONRPCMessage type: ${jsonObject.keys}") } } diff --git a/kotlin-sdk-server/api/kotlin-sdk-server.api b/kotlin-sdk-server/api/kotlin-sdk-server.api index f064546f..16739cc9 100644 --- a/kotlin-sdk-server/api/kotlin-sdk-server.api +++ b/kotlin-sdk-server/api/kotlin-sdk-server.api @@ -162,8 +162,8 @@ public final class io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServe public fun ()V public fun (ZZLjava/util/List;Ljava/util/List;Lio/modelcontextprotocol/kotlin/sdk/server/EventStore;Ljava/lang/Long;)V public synthetic fun (ZZLjava/util/List;Ljava/util/List;Lio/modelcontextprotocol/kotlin/sdk/server/EventStore;Ljava/lang/Long;ILkotlin/jvm/internal/DefaultConstructorMarker;)V - public fun closeSseStream (Lio/modelcontextprotocol/kotlin/sdk/types/RequestId;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public fun close (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public final fun closeSseStream (Lio/modelcontextprotocol/kotlin/sdk/types/RequestId;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public final fun getSessionId ()Ljava/lang/String; public final fun handleDeleteRequest (Lio/ktor/server/application/ApplicationCall;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public final fun handleGetRequest (Lio/ktor/server/sse/ServerSSESession;Lio/ktor/server/application/ApplicationCall;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; @@ -196,3 +196,4 @@ public final class io/modelcontextprotocol/kotlin/sdk/server/WebSocketMcpServerT public fun (Lio/ktor/server/websocket/WebSocketServerSession;)V public synthetic fun getSession ()Lio/ktor/websocket/WebSocketSession; } + diff --git a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt index aa0b38c1..ba3df79e 100644 --- a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt +++ b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt @@ -23,6 +23,7 @@ import io.modelcontextprotocol.kotlin.sdk.types.JSONRPCRequest import io.modelcontextprotocol.kotlin.sdk.types.JSONRPCResponse import io.modelcontextprotocol.kotlin.sdk.types.McpJson import io.modelcontextprotocol.kotlin.sdk.types.Method +import io.modelcontextprotocol.kotlin.sdk.types.PrimingEventMessage import io.modelcontextprotocol.kotlin.sdk.types.RPCError import io.modelcontextprotocol.kotlin.sdk.types.RequestId import io.modelcontextprotocol.kotlin.sdk.types.SUPPORTED_PROTOCOL_VERSIONS @@ -239,24 +240,6 @@ public class StreamableHttpServerTransport( } } - /** - * Closes the SSE stream associated with the given [requestId], prompting the client to reconnect. - * Useful for implementing polling behavior for long-running operations. - */ - public suspend fun closeSseStream(requestId: RequestId) { - if (enableJsonResponse) return - val streamId = requestToStreamMapping[requestId] ?: return - val sessionContext = streamsMapping[streamId] ?: return - - try { - sessionContext.session?.close() - } catch (e: Exception) { - _onError(e) - } finally { - streamsMapping.remove(streamId) - } - } - override suspend fun close() { streamMutex.withLock { streamsMapping.values.forEach { @@ -368,6 +351,7 @@ public class StreamableHttpServerTransport( if (!enableJsonResponse) { call.appendSseHeaders() flushSse(session) // flush headers immediately + maybeSendPrimingEvent(streamId, session) } streamMutex.withLock { @@ -429,6 +413,7 @@ public class StreamableHttpServerTransport( call.appendSseHeaders() flushSse(session) // flush headers immediately streamsMapping[STANDALONE_SSE_STREAM_ID] = SessionContext(session, call) + maybeSendPrimingEvent(STANDALONE_SSE_STREAM_ID, session) session.coroutineContext.job.invokeOnCompletion { streamsMapping.remove(STANDALONE_SSE_STREAM_ID) } } @@ -439,6 +424,24 @@ public class StreamableHttpServerTransport( call.respondNullable(status = HttpStatusCode.OK, message = null) } + /** + * Closes the SSE stream associated with the given [requestId], prompting the client to reconnect. + * Useful for implementing polling behavior for long-running operations. + */ + public suspend fun closeSseStream(requestId: RequestId) { + if (enableJsonResponse) return + val streamId = requestToStreamMapping[requestId] ?: return + val sessionContext = streamsMapping[streamId] ?: return + + try { + sessionContext.session?.close() + } catch (e: Exception) { + _onError(e) + } finally { + streamsMapping.remove(streamId) + } + } + private suspend fun replayEvents(store: EventStore, lastEventId: String, session: ServerSSESession) { val call: ApplicationCall = session.call @@ -645,6 +648,17 @@ public class StreamableHttpServerTransport( } } + private suspend fun maybeSendPrimingEvent(streamId: String, session: ServerSSESession?) { + val store = eventStore ?: return + val sseSession = session ?: return + try { + val primingEventId = store.storeEvent(streamId, PrimingEventMessage) + sseSession.send(id = primingEventId, retry = retryIntervalMillis, data = "") + } catch (e: Exception) { + _onError(e) + } + } + private fun ApplicationCall.appendSseHeaders() { this.response.headers.append(HttpHeaders.ContentType, ContentType.Text.EventStream.toString()) this.response.headers.append(HttpHeaders.CacheControl, "no-cache, no-transform") From c1ad3d2360e3855c1dd8201b61d816501084d5cc Mon Sep 17 00:00:00 2001 From: devcrocod Date: Thu, 4 Dec 2025 12:23:58 +0100 Subject: [PATCH 15/15] Refactored `StreamableHttpServerTransport` to improve SSE session handling, validate header values, and simplify MIME type matching logic. --- .../server/StreamableHttpServerTransport.kt | 64 +++++++++++-------- 1 file changed, 37 insertions(+), 27 deletions(-) diff --git a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt index ba3df79e..d1b9ce47 100644 --- a/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt +++ b/kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt @@ -249,6 +249,7 @@ public class StreamableHttpServerTransport( } } streamsMapping.clear() + requestToStreamMapping.clear() requestToResponseMapping.clear() _onClose() } @@ -380,7 +381,7 @@ public class StreamableHttpServerTransport( ) return } - session!! + val sseSession = session ?: error("Server session can't be null for streaming GET requests") val acceptHeader = call.request.header(HttpHeaders.Accept) if (!acceptHeader.accepts(ContentType.Text.EventStream)) { @@ -396,7 +397,7 @@ public class StreamableHttpServerTransport( eventStore?.let { store -> call.request.header(MCP_RESUMPTION_TOKEN_HEADER)?.let { lastEventId -> - replayEvents(store, lastEventId, session) + replayEvents(store, lastEventId, sseSession) return } } @@ -411,10 +412,12 @@ public class StreamableHttpServerTransport( } call.appendSseHeaders() - flushSse(session) // flush headers immediately - streamsMapping[STANDALONE_SSE_STREAM_ID] = SessionContext(session, call) - maybeSendPrimingEvent(STANDALONE_SSE_STREAM_ID, session) - session.coroutineContext.job.invokeOnCompletion { streamsMapping.remove(STANDALONE_SSE_STREAM_ID) } + flushSse(sseSession) // flush headers immediately + streamsMapping[STANDALONE_SSE_STREAM_ID] = SessionContext(sseSession, call) + maybeSendPrimingEvent(STANDALONE_SSE_STREAM_ID, sseSession) + sseSession.coroutineContext.job.invokeOnCompletion { + streamsMapping.remove(STANDALONE_SSE_STREAM_ID) + } } public suspend fun handleDeleteRequest(call: ApplicationCall) { @@ -516,19 +519,32 @@ public class StreamableHttpServerTransport( return false } - val headerId = call.request.header(MCP_SESSION_ID_HEADER) + val sessionHeaderValues = call.request.headers.getAll(MCP_SESSION_ID_HEADER) - return when { - headerId == null -> { - call.reject( - HttpStatusCode.BadRequest, - RPCError.ErrorCode.CONNECTION_CLOSED, - "Bad Request: Mcp-Session-Id header is required", - ) - false - } + if (sessionHeaderValues.isNullOrEmpty()) { + call.reject( + HttpStatusCode.BadRequest, + RPCError.ErrorCode.CONNECTION_CLOSED, + "Bad Request: Mcp-Session-Id header is required", + ) + return false + } - headerId != sessionId -> { + if (sessionHeaderValues.size > 1) { + call.reject( + HttpStatusCode.BadRequest, + RPCError.ErrorCode.CONNECTION_CLOSED, + "Bad Request: Mcp-Session-Id header must be a single value", + ) + return false + } + + val headerId = sessionHeaderValues.single() + + return when (headerId) { + sessionId -> true + + else -> { call.reject( HttpStatusCode.NotFound, -32001, @@ -536,13 +552,12 @@ public class StreamableHttpServerTransport( ) false } - - else -> true } } private suspend fun validateProtocolVersion(call: ApplicationCall): Boolean { - val version = call.request.header(MCP_PROTOCOL_VERSION_HEADER) ?: DEFAULT_NEGOTIATED_PROTOCOL_VERSION + val protocolVersions = call.request.headers.getAll(MCP_PROTOCOL_VERSION_HEADER) + val version = protocolVersions?.lastOrNull() ?: DEFAULT_NEGOTIATED_PROTOCOL_VERSION return when (version) { !in SUPPORTED_PROTOCOL_VERSIONS -> { @@ -631,13 +646,8 @@ public class StreamableHttpServerTransport( } } - private fun String?.accepts(mime: ContentType): Boolean { - if (this == null) return false - - val escaped = Regex.escape(mime.toString()) - val pattern = Regex("""(^|,\s*)$escaped(\s*(;|,|$))""", RegexOption.IGNORE_CASE) - return pattern.containsMatchIn(this) - } + private fun String?.accepts(mime: ContentType): Boolean = + this?.lowercase()?.contains(mime.toString().lowercase()) == true private suspend fun emitOnStream(streamId: String, session: ServerSSESession?, message: JSONRPCMessage) { val eventId = eventStore?.storeEvent(streamId, message)