Skip to content

Commit 47768e2

Browse files
zsheadevcrocod
authored andcommitted
Add Streamable Http Transport
1 parent 474a21e commit 47768e2

File tree

3 files changed

+736
-0
lines changed

3 files changed

+736
-0
lines changed

kotlin-sdk-server/api/kotlin-sdk-server.api

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,20 @@
1+
public final class io/modelcontextprotocol/kotlin/sdk/LibVersionKt {
2+
public static final field LIB_VERSION Ljava/lang/String;
3+
}
4+
5+
public abstract interface class io/modelcontextprotocol/kotlin/sdk/server/EventStore {
6+
public abstract fun replayEventsAfter (Ljava/lang/String;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
7+
public abstract fun storeEvent (Ljava/lang/String;Lio/modelcontextprotocol/kotlin/sdk/JSONRPCMessage;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
8+
}
9+
110
public final class io/modelcontextprotocol/kotlin/sdk/server/KtorServerKt {
211
public static final fun mcp (Lio/ktor/server/application/Application;Lkotlin/jvm/functions/Function1;)V
312
public static final fun mcp (Lio/ktor/server/routing/Routing;Ljava/lang/String;Lkotlin/jvm/functions/Function1;)V
413
public static final fun mcp (Lio/ktor/server/routing/Routing;Lkotlin/jvm/functions/Function1;)V
14+
public static final fun mcpStatelessStreamableHttp (Lio/ktor/server/application/Application;ZLjava/util/List;Ljava/util/List;Lio/modelcontextprotocol/kotlin/sdk/server/EventStore;Lkotlin/jvm/functions/Function1;)V
15+
public static synthetic fun mcpStatelessStreamableHttp$default (Lio/ktor/server/application/Application;ZLjava/util/List;Ljava/util/List;Lio/modelcontextprotocol/kotlin/sdk/server/EventStore;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)V
16+
public static final fun mcpStreamableHttp (Lio/ktor/server/application/Application;ZLjava/util/List;Ljava/util/List;Lio/modelcontextprotocol/kotlin/sdk/server/EventStore;Lkotlin/jvm/functions/Function1;)V
17+
public static synthetic fun mcpStreamableHttp$default (Lio/ktor/server/application/Application;ZLjava/util/List;Ljava/util/List;Lio/modelcontextprotocol/kotlin/sdk/server/EventStore;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)V
518
}
619

720
public final class io/modelcontextprotocol/kotlin/sdk/server/RegisteredPrompt : io/modelcontextprotocol/kotlin/sdk/server/Feature {
@@ -147,6 +160,24 @@ public final class io/modelcontextprotocol/kotlin/sdk/server/StdioServerTranspor
147160
public fun start (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
148161
}
149162

163+
public final class io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport : io/modelcontextprotocol/kotlin/sdk/shared/AbstractTransport {
164+
public static final field STANDALONE_SSE_STREAM_ID Ljava/lang/String;
165+
public fun <init> ()V
166+
public fun <init> (ZZLjava/util/List;Ljava/util/List;Lio/modelcontextprotocol/kotlin/sdk/server/EventStore;)V
167+
public synthetic fun <init> (ZZLjava/util/List;Ljava/util/List;Lio/modelcontextprotocol/kotlin/sdk/server/EventStore;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
168+
public fun close (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
169+
public final fun getSessionId ()Ljava/lang/String;
170+
public final fun handleDeleteRequest (Lio/ktor/server/sse/ServerSSESession;Lio/ktor/server/application/ApplicationCall;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
171+
public final fun handleGetRequest (Lio/ktor/server/sse/ServerSSESession;Lio/ktor/server/application/ApplicationCall;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
172+
public final fun handlePostRequest (Lio/ktor/server/sse/ServerSSESession;Lio/ktor/server/application/ApplicationCall;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
173+
public final fun handleRequest (Lio/ktor/server/sse/ServerSSESession;Lio/ktor/server/application/ApplicationCall;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
174+
public fun send (Lio/modelcontextprotocol/kotlin/sdk/JSONRPCMessage;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
175+
public final fun setOnSessionClosed (Lkotlin/jvm/functions/Function1;)V
176+
public final fun setOnSessionInitialized (Lkotlin/jvm/functions/Function1;)V
177+
public final fun setSessionIdGenerator (Lkotlin/jvm/functions/Function0;)V
178+
public fun start (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
179+
}
180+
150181
public final class io/modelcontextprotocol/kotlin/sdk/server/WebSocketMcpKtorServerExtensionsKt {
151182
public static final fun mcpWebSocket (Lio/ktor/server/application/Application;Ljava/lang/String;Lkotlin/jvm/functions/Function0;)V
152183
public static final fun mcpWebSocket (Lio/ktor/server/application/Application;Lkotlin/jvm/functions/Function0;)V

kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/KtorServer.kt

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import io.github.oshai.kotlinlogging.KotlinLogging
44
import io.ktor.http.HttpStatusCode
55
import io.ktor.server.application.Application
66
import io.ktor.server.application.install
7+
import io.ktor.server.request.header
78
import io.ktor.server.response.respond
89
import io.ktor.server.routing.Routing
910
import io.ktor.server.routing.RoutingContext
@@ -13,13 +14,15 @@ import io.ktor.server.routing.routing
1314
import io.ktor.server.sse.SSE
1415
import io.ktor.server.sse.ServerSSESession
1516
import io.ktor.server.sse.sse
17+
import io.ktor.util.collections.ConcurrentMap
1618
import io.ktor.utils.io.KtorDsl
1719
import kotlinx.atomicfu.AtomicRef
1820
import kotlinx.atomicfu.atomic
1921
import kotlinx.atomicfu.update
2022
import kotlinx.collections.immutable.PersistentMap
2123
import kotlinx.collections.immutable.toPersistentMap
2224
import kotlinx.coroutines.awaitCancellation
25+
import io.modelcontextprotocol.kotlin.sdk.ErrorCode
2326

2427
private val logger = KotlinLogging.logger {}
2528

@@ -69,6 +72,51 @@ public fun Application.mcp(block: ServerSSESession.() -> Server) {
6972
}
7073
}
7174

75+
@KtorDsl
76+
public fun Application.mcpStreamableHttp(
77+
enableDnsRebindingProtection: Boolean = false,
78+
allowedHosts: List<String>? = null,
79+
allowedOrigins: List<String>? = null,
80+
eventStore: EventStore? = null,
81+
block: RoutingContext.() -> Server,
82+
) {
83+
val transports = ConcurrentMap<String, StreamableHttpServerTransport>()
84+
85+
routing {
86+
post("/mcp") {
87+
mcpStreamableHttpEndpoint(
88+
transports,
89+
enableDnsRebindingProtection,
90+
allowedHosts,
91+
allowedOrigins,
92+
eventStore,
93+
block,
94+
)
95+
}
96+
}
97+
}
98+
99+
@KtorDsl
100+
public fun Application.mcpStatelessStreamableHttp(
101+
enableDnsRebindingProtection: Boolean = false,
102+
allowedHosts: List<String>? = null,
103+
allowedOrigins: List<String>? = null,
104+
eventStore: EventStore? = null,
105+
block: RoutingContext.() -> Server,
106+
) {
107+
routing {
108+
post("/mcp") {
109+
mcpStatelessStreamableHttpEndpoint(
110+
enableDnsRebindingProtection,
111+
allowedHosts,
112+
allowedOrigins,
113+
eventStore,
114+
block,
115+
)
116+
}
117+
}
118+
}
119+
72120
internal suspend fun ServerSSESession.mcpSseEndpoint(
73121
postEndpoint: String,
74122
sseTransportManager: SseTransportManager,
@@ -101,6 +149,88 @@ internal fun ServerSSESession.mcpSseTransport(
101149
return transport
102150
}
103151

152+
internal suspend fun RoutingContext.mcpStreamableHttpEndpoint(
153+
transports: ConcurrentMap<String, StreamableHttpServerTransport>,
154+
enableDnsRebindingProtection: Boolean = false,
155+
allowedHosts: List<String>? = null,
156+
allowedOrigins: List<String>? = null,
157+
eventStore: EventStore? = null,
158+
block: RoutingContext.() -> Server,
159+
) {
160+
val sessionId = this.call.request.header(MCP_SESSION_ID_HEADER)
161+
val transport = if (sessionId != null && transports.containsKey(sessionId)) {
162+
transports[sessionId]!!
163+
} else if (sessionId == null) {
164+
val transport = StreamableHttpServerTransport(
165+
enableDnsRebindingProtection = enableDnsRebindingProtection,
166+
allowedHosts = allowedHosts,
167+
allowedOrigins = allowedOrigins,
168+
eventStore = eventStore,
169+
enableJsonResponse = true,
170+
)
171+
172+
transport.setOnSessionInitialized { sessionId ->
173+
transports[sessionId] = transport
174+
175+
logger.info { "New StreamableHttp connection established and stored with sessionId: $sessionId" }
176+
}
177+
178+
val server = block()
179+
server.onClose {
180+
logger.info { "Server connection closed for sessionId: ${transport.sessionId}" }
181+
}
182+
183+
server.connect(transport)
184+
185+
transport
186+
} else {
187+
null
188+
}
189+
190+
if (transport == null) {
191+
this.call.reject(
192+
HttpStatusCode.BadRequest,
193+
ErrorCode.Unknown(-32000),
194+
"Bad Request: No valid session ID provided",
195+
)
196+
return
197+
}
198+
199+
transport.handleRequest(null, this.call)
200+
logger.debug { "Server connected to transport for sessionId: ${transport.sessionId}" }
201+
}
202+
203+
internal suspend fun RoutingContext.mcpStatelessStreamableHttpEndpoint(
204+
enableDnsRebindingProtection: Boolean = false,
205+
allowedHosts: List<String>? = null,
206+
allowedOrigins: List<String>? = null,
207+
eventStore: EventStore? = null,
208+
block: RoutingContext.() -> Server,
209+
) {
210+
val transport = StreamableHttpServerTransport(
211+
enableDnsRebindingProtection = enableDnsRebindingProtection,
212+
allowedHosts = allowedHosts,
213+
allowedOrigins = allowedOrigins,
214+
eventStore = eventStore,
215+
enableJsonResponse = true,
216+
)
217+
transport.setSessionIdGenerator(null)
218+
219+
logger.info { "New stateless StreamableHttp connection established without sessionId" }
220+
221+
val server = block()
222+
223+
server.onClose {
224+
logger.info { "Server connection closed without sessionId" }
225+
}
226+
227+
server.connect(transport)
228+
229+
transport.handleRequest(null, this.call)
230+
231+
logger.debug { "Server connected to transport without sessionId" }
232+
}
233+
104234
internal suspend fun RoutingContext.mcpPostEndpoint(sseTransportManager: SseTransportManager) {
105235
val sessionId: String = call.request.queryParameters["sessionId"] ?: run {
106236
call.respond(HttpStatusCode.BadRequest, "sessionId query parameter is not provided")

0 commit comments

Comments
 (0)