Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions kotlin-sdk-core/api/kotlin-sdk-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -988,6 +988,7 @@ public final class io/modelcontextprotocol/kotlin/sdk/types/ClientResult$Default
}

public final class io/modelcontextprotocol/kotlin/sdk/types/CommonKt {
public static final field DEFAULT_NEGOTIATED_PROTOCOL_VERSION Ljava/lang/String;
public static final field LATEST_PROTOCOL_VERSION Ljava/lang/String;
public static final fun ProgressToken (J)Lio/modelcontextprotocol/kotlin/sdk/types/RequestId;
public static final fun ProgressToken (Ljava/lang/String;)Lio/modelcontextprotocol/kotlin/sdk/types/RequestId;
Expand Down Expand Up @@ -2946,6 +2947,15 @@ public final class io/modelcontextprotocol/kotlin/sdk/types/PingRequestBuilder :
public synthetic fun build$kotlin_sdk_core ()Lio/modelcontextprotocol/kotlin/sdk/types/Request;
}

public final class io/modelcontextprotocol/kotlin/sdk/types/PrimingEventMessage : io/modelcontextprotocol/kotlin/sdk/types/JSONRPCMessage {
public static final field INSTANCE Lio/modelcontextprotocol/kotlin/sdk/types/PrimingEventMessage;
public fun equals (Ljava/lang/Object;)Z
public fun getJsonrpc ()Ljava/lang/String;
public fun hashCode ()I
public final fun serializer ()Lkotlinx/serialization/KSerializer;
public fun toString ()Ljava/lang/String;
}

public final class io/modelcontextprotocol/kotlin/sdk/types/Progress {
public static final field Companion Lio/modelcontextprotocol/kotlin/sdk/types/Progress$Companion;
public fun <init> (DLjava/lang/Double;Ljava/lang/String;)V
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import io.modelcontextprotocol.kotlin.sdk.types.McpJson
import io.modelcontextprotocol.kotlin.sdk.types.Method
import io.modelcontextprotocol.kotlin.sdk.types.Notification
import io.modelcontextprotocol.kotlin.sdk.types.PingRequest
import io.modelcontextprotocol.kotlin.sdk.types.PrimingEventMessage
import io.modelcontextprotocol.kotlin.sdk.types.Progress
import io.modelcontextprotocol.kotlin.sdk.types.ProgressNotification
import io.modelcontextprotocol.kotlin.sdk.types.ProgressToken
Expand Down Expand Up @@ -249,6 +250,7 @@ public abstract class Protocol(@PublishedApi internal val options: ProtocolOptio
is JSONRPCRequest -> onRequest(message)
is JSONRPCNotification -> onNotification(message)
is JSONRPCError -> onResponse(null, message)
is PrimingEventMessage -> Unit
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package io.modelcontextprotocol.kotlin.sdk.types

import io.modelcontextprotocol.kotlin.sdk.types.Icon.Theme.Dark
import io.modelcontextprotocol.kotlin.sdk.types.Icon.Theme.Light
import kotlinx.serialization.SerialName
import kotlinx.serialization.Serializable
import kotlinx.serialization.json.JsonObject
Expand All @@ -12,6 +10,8 @@ import kotlinx.serialization.json.JsonObject

public const val LATEST_PROTOCOL_VERSION: String = "2025-06-18"

public const val DEFAULT_NEGOTIATED_PROTOCOL_VERSION: String = "2025-03-26"

public val SUPPORTED_PROTOCOL_VERSIONS: List<String> = listOf(
LATEST_PROTOCOL_VERSION,
"2025-03-26",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ public sealed interface JSONRPCMessage {
public val jsonrpc: String
}

@Serializable
public data object PrimingEventMessage : JSONRPCMessage {
@EncodeDefault
override val jsonrpc: String = JSONRPC_VERSION
}

Comment on lines +88 to +93
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is it for?

// ============================================================================
// JSONRPCRequest
// ============================================================================
Expand Down Expand Up @@ -197,7 +203,7 @@ public data class JSONRPCResponse(val id: RequestId, val result: RequestResult =
* @property error Details about the error that occurred, including error code and message.
*/
@Serializable
public data class JSONRPCError(val id: RequestId, val error: RPCError) : JSONRPCMessage {
public data class JSONRPCError(val id: RequestId?, val error: RPCError) : JSONRPCMessage {
@EncodeDefault
override val jsonrpc: String = JSONRPC_VERSION
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,7 @@ internal object JSONRPCMessagePolymorphicSerializer :
"result" in jsonObject -> JSONRPCResponse.serializer()
"method" in jsonObject && "id" in jsonObject -> JSONRPCRequest.serializer()
"method" in jsonObject -> JSONRPCNotification.serializer()
jsonObject.isEmpty() || jsonObject.keys == setOf("jsonrpc") -> PrimingEventMessage.serializer()
else -> throw SerializationException("Invalid JSONRPCMessage type: ${jsonObject.keys}")
}
}
Expand Down
29 changes: 29 additions & 0 deletions kotlin-sdk-server/api/kotlin-sdk-server.api
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@
public abstract interface class io/modelcontextprotocol/kotlin/sdk/server/EventStore {
public abstract fun getStreamIdForEventId (Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun replayEventsAfter (Ljava/lang/String;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun storeEvent (Ljava/lang/String;Lio/modelcontextprotocol/kotlin/sdk/types/JSONRPCMessage;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public final class io/modelcontextprotocol/kotlin/sdk/server/KtorServerKt {
public static final fun mcp (Lio/ktor/server/application/Application;Lkotlin/jvm/functions/Function1;)V
public static final fun mcp (Lio/ktor/server/routing/Routing;Ljava/lang/String;Lkotlin/jvm/functions/Function1;)V
public static final fun mcp (Lio/ktor/server/routing/Routing;Lkotlin/jvm/functions/Function1;)V
public static final fun mcpStatelessStreamableHttp (Lio/ktor/server/application/Application;ZLjava/util/List;Ljava/util/List;Lio/modelcontextprotocol/kotlin/sdk/server/EventStore;Lkotlin/jvm/functions/Function1;)V
public static synthetic fun mcpStatelessStreamableHttp$default (Lio/ktor/server/application/Application;ZLjava/util/List;Ljava/util/List;Lio/modelcontextprotocol/kotlin/sdk/server/EventStore;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)V
public static final fun mcpStreamableHttp (Lio/ktor/server/application/Application;ZLjava/util/List;Ljava/util/List;Lio/modelcontextprotocol/kotlin/sdk/server/EventStore;Lkotlin/jvm/functions/Function1;)V
public static synthetic fun mcpStreamableHttp$default (Lio/ktor/server/application/Application;ZLjava/util/List;Ljava/util/List;Lio/modelcontextprotocol/kotlin/sdk/server/EventStore;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)V
}

public final class io/modelcontextprotocol/kotlin/sdk/server/RegisteredPrompt : io/modelcontextprotocol/kotlin/sdk/server/Feature {
Expand Down Expand Up @@ -147,6 +157,25 @@ public final class io/modelcontextprotocol/kotlin/sdk/server/StdioServerTranspor
public fun start (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public final class io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport : io/modelcontextprotocol/kotlin/sdk/shared/AbstractTransport {
public static final field STANDALONE_SSE_STREAM_ID Ljava/lang/String;
public fun <init> ()V
public fun <init> (ZZLjava/util/List;Ljava/util/List;Lio/modelcontextprotocol/kotlin/sdk/server/EventStore;Ljava/lang/Long;)V
public synthetic fun <init> (ZZLjava/util/List;Ljava/util/List;Lio/modelcontextprotocol/kotlin/sdk/server/EventStore;Ljava/lang/Long;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
public fun close (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public final fun closeSseStream (Lio/modelcontextprotocol/kotlin/sdk/types/RequestId;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public final fun getSessionId ()Ljava/lang/String;
public final fun handleDeleteRequest (Lio/ktor/server/application/ApplicationCall;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public final fun handleGetRequest (Lio/ktor/server/sse/ServerSSESession;Lio/ktor/server/application/ApplicationCall;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public final fun handlePostRequest (Lio/ktor/server/sse/ServerSSESession;Lio/ktor/server/application/ApplicationCall;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public final fun handleRequest (Lio/ktor/server/sse/ServerSSESession;Lio/ktor/server/application/ApplicationCall;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun send (Lio/modelcontextprotocol/kotlin/sdk/types/JSONRPCMessage;Lio/modelcontextprotocol/kotlin/sdk/shared/TransportSendOptions;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public final fun setOnSessionClosed (Lkotlin/jvm/functions/Function1;)V
public final fun setOnSessionInitialized (Lkotlin/jvm/functions/Function1;)V
public final fun setSessionIdGenerator (Lkotlin/jvm/functions/Function0;)V
public fun start (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public final class io/modelcontextprotocol/kotlin/sdk/server/WebSocketMcpKtorServerExtensionsKt {
public static final fun mcpWebSocket (Lio/ktor/server/application/Application;Ljava/lang/String;Lkotlin/jvm/functions/Function0;)V
public static final fun mcpWebSocket (Lio/ktor/server/application/Application;Lkotlin/jvm/functions/Function0;)V
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import io.github.oshai.kotlinlogging.KotlinLogging
import io.ktor.http.HttpStatusCode
import io.ktor.server.application.Application
import io.ktor.server.application.install
import io.ktor.server.request.header
import io.ktor.server.response.respond
import io.ktor.server.routing.Routing
import io.ktor.server.routing.RoutingContext
Expand All @@ -14,6 +15,8 @@ import io.ktor.server.sse.SSE
import io.ktor.server.sse.ServerSSESession
import io.ktor.server.sse.sse
import io.ktor.utils.io.KtorDsl
import io.modelcontextprotocol.kotlin.sdk.shared.AbstractTransport
import io.modelcontextprotocol.kotlin.sdk.types.RPCError
import kotlinx.atomicfu.AtomicRef
import kotlinx.atomicfu.atomic
import kotlinx.atomicfu.update
Expand All @@ -23,13 +26,15 @@ import kotlinx.coroutines.awaitCancellation

private val logger = KotlinLogging.logger {}

internal class SseTransportManager(transports: Map<String, SseServerTransport> = emptyMap()) {
private val transports: AtomicRef<PersistentMap<String, SseServerTransport>> = atomic(transports.toPersistentMap())
internal class TransportManager(transports: Map<String, AbstractTransport> = emptyMap()) {
private val transports: AtomicRef<PersistentMap<String, AbstractTransport>> = atomic(transports.toPersistentMap())

fun getTransport(sessionId: String): SseServerTransport? = transports.value[sessionId]
fun hasTransport(sessionId: String): Boolean = transports.value.containsKey(sessionId)

fun addTransport(transport: SseServerTransport) {
transports.update { it.put(transport.sessionId, transport) }
fun getTransport(sessionId: String): AbstractTransport? = transports.value[sessionId]

fun addTransport(sessionId: String, transport: AbstractTransport) {
transports.update { it.put(sessionId, transport) }
}

fun removeTransport(sessionId: String) {
Expand All @@ -49,14 +54,14 @@ public fun Routing.mcp(path: String, block: ServerSSESession.() -> Server) {
*/
@KtorDsl
public fun Routing.mcp(block: ServerSSESession.() -> Server) {
val sseTransportManager = SseTransportManager()
val transportManager = TransportManager()

sse {
mcpSseEndpoint("", sseTransportManager, block)
mcpSseEndpoint("", transportManager, block)
}

post {
mcpPostEndpoint(sseTransportManager)
mcpPostEndpoint(transportManager)
}
}

Expand All @@ -69,18 +74,71 @@ public fun Application.mcp(block: ServerSSESession.() -> Server) {
}
}

/*
* Configures the Ktor Application to handle Model Context Protocol (MCP) over Streamable Http.
* It currently only works with JSON response.
*/
@KtorDsl
public fun Application.mcpStreamableHttp(
enableDnsRebindingProtection: Boolean = false,
allowedHosts: List<String>? = null,
allowedOrigins: List<String>? = null,
eventStore: EventStore? = null,
block: RoutingContext.() -> Server,
) {
val transportManager = TransportManager()

routing {
post("/mcp") {
mcpStreamableHttpEndpoint(
transportManager,
enableDnsRebindingProtection,
allowedHosts,
allowedOrigins,
eventStore,
block,
)
}
}
}

/*
* Configures the Ktor Application to handle Model Context Protocol (MCP) over stateless Streamable Http.
* It currently only works with JSON response.
*/
@KtorDsl
public fun Application.mcpStatelessStreamableHttp(
enableDnsRebindingProtection: Boolean = false,
allowedHosts: List<String>? = null,
allowedOrigins: List<String>? = null,
eventStore: EventStore? = null,
block: RoutingContext.() -> Server,
) {
routing {
post("/mcp") {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get requests also need to be processed.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't implement because it's not needed for json response. Once we add SSE back, we can do it then.

Copy link

@dvilker dvilker Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't implement because it's not needed for json response. Once we add SSE back, we can do it then.

https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#streamable-http

The server MUST either return Content-Type: text/event-stream in response to this HTTP GET, or else return HTTP 405 Method Not Allowed, indicating that the server does not offer an SSE stream at this endpoint

Without processing (which responds with code 405), it seems the inspector was spamming errors.

I replaced post(... with route(".... Everything else has already been implemented by you.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm drafting a PR to return 405 for the stateless extension on GET requests, as per spec

The server MUST either return Content-Type: text/event-stream in response to this HTTP GET, or else return HTTP 405 Method Not Allowed, indicating that the server does not offer an SSE stream at this endpoint.

mcpStatelessStreamableHttpEndpoint(
enableDnsRebindingProtection,
allowedHosts,
allowedOrigins,
eventStore,
block,
)
}
}
}

internal suspend fun ServerSSESession.mcpSseEndpoint(
postEndpoint: String,
sseTransportManager: SseTransportManager,
transportManager: TransportManager,
block: ServerSSESession.() -> Server,
) {
val transport = mcpSseTransport(postEndpoint, sseTransportManager)
val transport = mcpSseTransport(postEndpoint, transportManager)

val server = block()

server.onClose {
logger.info { "Server connection closed for sessionId: ${transport.sessionId}" }
sseTransportManager.removeTransport(transport.sessionId)
transportManager.removeTransport(transport.sessionId)
}

server.createSession(transport)
Expand All @@ -92,24 +150,106 @@ internal suspend fun ServerSSESession.mcpSseEndpoint(

internal fun ServerSSESession.mcpSseTransport(
postEndpoint: String,
sseTransportManager: SseTransportManager,
transportManager: TransportManager,
): SseServerTransport {
val transport = SseServerTransport(postEndpoint, this)
sseTransportManager.addTransport(transport)
transportManager.addTransport(transport.sessionId, transport)
logger.info { "New SSE connection established and stored with sessionId: ${transport.sessionId}" }

return transport
}

internal suspend fun RoutingContext.mcpPostEndpoint(sseTransportManager: SseTransportManager) {
internal suspend fun RoutingContext.mcpStreamableHttpEndpoint(
transportManager: TransportManager,
enableDnsRebindingProtection: Boolean = false,
allowedHosts: List<String>? = null,
allowedOrigins: List<String>? = null,
eventStore: EventStore? = null,
block: RoutingContext.() -> Server,
) {
val sessionId = this.call.request.header(MCP_SESSION_ID_HEADER)
val transport = if (sessionId != null && transportManager.hasTransport(sessionId)) {
transportManager.getTransport(sessionId)
} else if (sessionId == null) {
val transport = StreamableHttpServerTransport(
enableDnsRebindingProtection = enableDnsRebindingProtection,
allowedHosts = allowedHosts,
allowedOrigins = allowedOrigins,
eventStore = eventStore,
enableJsonResponse = true,
)

transport.setOnSessionInitialized { sessionId ->
transportManager.addTransport(sessionId, transport)

logger.info { "New StreamableHttp connection established and stored with sessionId: $sessionId" }
}

val server = block()
server.onClose {
logger.info { "Server connection closed for sessionId: ${transport.sessionId}" }
}

server.createSession(transport)

transport
} else {
null
}

if (transport == null) {
this.call.reject(
HttpStatusCode.BadRequest,
RPCError.ErrorCode.CONNECTION_CLOSED,
"Bad Request: No valid session ID provided",
)
return
}

(transport as StreamableHttpServerTransport).handleRequest(null, this.call)
logger.debug { "Server connected to transport for sessionId: ${transport.sessionId}" }
}

internal suspend fun RoutingContext.mcpStatelessStreamableHttpEndpoint(
enableDnsRebindingProtection: Boolean = false,
allowedHosts: List<String>? = null,
allowedOrigins: List<String>? = null,
eventStore: EventStore? = null,
block: RoutingContext.() -> Server,
) {
val transport = StreamableHttpServerTransport(
enableDnsRebindingProtection = enableDnsRebindingProtection,
allowedHosts = allowedHosts,
allowedOrigins = allowedOrigins,
eventStore = eventStore,
enableJsonResponse = true,
)
transport.setSessionIdGenerator(null)

logger.info { "New stateless StreamableHttp connection established without sessionId" }

val server = block()

server.onClose {
logger.info { "Server connection closed without sessionId" }
}

server.createSession(transport)

transport.handleRequest(null, this.call)

logger.debug { "Server connected to transport without sessionId" }
}

internal suspend fun RoutingContext.mcpPostEndpoint(transportManager: TransportManager) {
val sessionId: String = call.request.queryParameters["sessionId"] ?: run {
call.respond(HttpStatusCode.BadRequest, "sessionId query parameter is not provided")
return
}

logger.debug { "Received message for sessionId: $sessionId" }

val transport = sseTransportManager.getTransport(sessionId)
val transport = transportManager.getTransport(sessionId) as SseServerTransport?
if (transport == null) {
logger.warn { "Session not found for sessionId: $sessionId" }
call.respond(HttpStatusCode.NotFound, "Session not found")
Expand Down
Loading