Skip to content

Commit 8532154

Browse files
committed
Added closeSseStream method to StreamableHttpServerTransport for managing SSE stream lifecycle and updated API signature.
1 parent e8600ca commit 8532154

File tree

2 files changed

+21
-3
lines changed

2 files changed

+21
-3
lines changed

kotlin-sdk-server/api/kotlin-sdk-server.api

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -160,8 +160,9 @@ public final class io/modelcontextprotocol/kotlin/sdk/server/StdioServerTranspor
160160
public final class io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport : io/modelcontextprotocol/kotlin/sdk/shared/AbstractTransport {
161161
public static final field STANDALONE_SSE_STREAM_ID Ljava/lang/String;
162162
public fun <init> ()V
163-
public fun <init> (ZZLjava/util/List;Ljava/util/List;Lio/modelcontextprotocol/kotlin/sdk/server/EventStore;)V
164-
public synthetic fun <init> (ZZLjava/util/List;Ljava/util/List;Lio/modelcontextprotocol/kotlin/sdk/server/EventStore;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
163+
public fun <init> (ZZLjava/util/List;Ljava/util/List;Lio/modelcontextprotocol/kotlin/sdk/server/EventStore;Ljava/lang/Long;)V
164+
public synthetic fun <init> (ZZLjava/util/List;Ljava/util/List;Lio/modelcontextprotocol/kotlin/sdk/server/EventStore;Ljava/lang/Long;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
165+
public fun closeSseStream (Lio/modelcontextprotocol/kotlin/sdk/types/RequestId;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
165166
public fun close (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
166167
public final fun getSessionId ()Ljava/lang/String;
167168
public final fun handleDeleteRequest (Lio/ktor/server/application/ApplicationCall;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
@@ -195,4 +196,3 @@ public final class io/modelcontextprotocol/kotlin/sdk/server/WebSocketMcpServerT
195196
public fun <init> (Lio/ktor/server/websocket/WebSocketServerSession;)V
196197
public synthetic fun getSession ()Lio/ktor/websocket/WebSocketSession;
197198
}
198-

kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,24 @@ public class StreamableHttpServerTransport(
239239
}
240240
}
241241

242+
/**
243+
* Closes the SSE stream associated with the given [requestId], prompting the client to reconnect.
244+
* Useful for implementing polling behavior for long-running operations.
245+
*/
246+
public suspend fun closeSseStream(requestId: RequestId) {
247+
if (enableJsonResponse) return
248+
val streamId = requestToStreamMapping[requestId] ?: return
249+
val sessionContext = streamsMapping[streamId] ?: return
250+
251+
try {
252+
sessionContext.session?.close()
253+
} catch (e: Exception) {
254+
_onError(e)
255+
} finally {
256+
streamsMapping.remove(streamId)
257+
}
258+
}
259+
242260
override suspend fun close() {
243261
streamMutex.withLock {
244262
streamsMapping.values.forEach {

0 commit comments

Comments
 (0)