Skip to content

Commit 5de8d44

Browse files
committed
Added PrimingEventMessage to support SSE stream initialization and integrated it with StreamableHttpServerTransport. Updated serializers and API signatures accordingly.
1 parent 8532154 commit 5de8d44

File tree

6 files changed

+52
-19
lines changed

6 files changed

+52
-19
lines changed

kotlin-sdk-core/api/kotlin-sdk-core.api

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2947,6 +2947,15 @@ public final class io/modelcontextprotocol/kotlin/sdk/types/PingRequestBuilder :
29472947
public synthetic fun build$kotlin_sdk_core ()Lio/modelcontextprotocol/kotlin/sdk/types/Request;
29482948
}
29492949

2950+
public final class io/modelcontextprotocol/kotlin/sdk/types/PrimingEventMessage : io/modelcontextprotocol/kotlin/sdk/types/JSONRPCMessage {
2951+
public static final field INSTANCE Lio/modelcontextprotocol/kotlin/sdk/types/PrimingEventMessage;
2952+
public fun equals (Ljava/lang/Object;)Z
2953+
public fun getJsonrpc ()Ljava/lang/String;
2954+
public fun hashCode ()I
2955+
public final fun serializer ()Lkotlinx/serialization/KSerializer;
2956+
public fun toString ()Ljava/lang/String;
2957+
}
2958+
29502959
public final class io/modelcontextprotocol/kotlin/sdk/types/Progress {
29512960
public static final field Companion Lio/modelcontextprotocol/kotlin/sdk/types/Progress$Companion;
29522961
public fun <init> (DLjava/lang/Double;Ljava/lang/String;)V

kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/shared/Protocol.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import io.modelcontextprotocol.kotlin.sdk.types.McpJson
1313
import io.modelcontextprotocol.kotlin.sdk.types.Method
1414
import io.modelcontextprotocol.kotlin.sdk.types.Notification
1515
import io.modelcontextprotocol.kotlin.sdk.types.PingRequest
16+
import io.modelcontextprotocol.kotlin.sdk.types.PrimingEventMessage
1617
import io.modelcontextprotocol.kotlin.sdk.types.Progress
1718
import io.modelcontextprotocol.kotlin.sdk.types.ProgressNotification
1819
import io.modelcontextprotocol.kotlin.sdk.types.ProgressToken
@@ -249,6 +250,7 @@ public abstract class Protocol(@PublishedApi internal val options: ProtocolOptio
249250
is JSONRPCRequest -> onRequest(message)
250251
is JSONRPCNotification -> onNotification(message)
251252
is JSONRPCError -> onResponse(null, message)
253+
is PrimingEventMessage -> Unit
252254
}
253255
}
254256

kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/types/jsonRpc.kt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,12 @@ public sealed interface JSONRPCMessage {
8585
public val jsonrpc: String
8686
}
8787

88+
@Serializable
89+
public data object PrimingEventMessage : JSONRPCMessage {
90+
@EncodeDefault
91+
override val jsonrpc: String = JSONRPC_VERSION
92+
}
93+
8894
// ============================================================================
8995
// JSONRPCRequest
9096
// ============================================================================

kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/types/serializers.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,7 @@ internal object JSONRPCMessagePolymorphicSerializer :
384384
"result" in jsonObject -> JSONRPCResponse.serializer()
385385
"method" in jsonObject && "id" in jsonObject -> JSONRPCRequest.serializer()
386386
"method" in jsonObject -> JSONRPCNotification.serializer()
387+
jsonObject.isEmpty() || jsonObject.keys == setOf("jsonrpc") -> PrimingEventMessage.serializer()
387388
else -> throw SerializationException("Invalid JSONRPCMessage type: ${jsonObject.keys}")
388389
}
389390
}

kotlin-sdk-server/api/kotlin-sdk-server.api

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,8 +162,8 @@ public final class io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServe
162162
public fun <init> ()V
163163
public fun <init> (ZZLjava/util/List;Ljava/util/List;Lio/modelcontextprotocol/kotlin/sdk/server/EventStore;Ljava/lang/Long;)V
164164
public synthetic fun <init> (ZZLjava/util/List;Ljava/util/List;Lio/modelcontextprotocol/kotlin/sdk/server/EventStore;Ljava/lang/Long;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
165-
public fun closeSseStream (Lio/modelcontextprotocol/kotlin/sdk/types/RequestId;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
166165
public fun close (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
166+
public final fun closeSseStream (Lio/modelcontextprotocol/kotlin/sdk/types/RequestId;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
167167
public final fun getSessionId ()Ljava/lang/String;
168168
public final fun handleDeleteRequest (Lio/ktor/server/application/ApplicationCall;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
169169
public final fun handleGetRequest (Lio/ktor/server/sse/ServerSSESession;Lio/ktor/server/application/ApplicationCall;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
@@ -196,3 +196,4 @@ public final class io/modelcontextprotocol/kotlin/sdk/server/WebSocketMcpServerT
196196
public fun <init> (Lio/ktor/server/websocket/WebSocketServerSession;)V
197197
public synthetic fun getSession ()Lio/ktor/websocket/WebSocketSession;
198198
}
199+

kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt

Lines changed: 32 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import io.modelcontextprotocol.kotlin.sdk.types.JSONRPCRequest
2323
import io.modelcontextprotocol.kotlin.sdk.types.JSONRPCResponse
2424
import io.modelcontextprotocol.kotlin.sdk.types.McpJson
2525
import io.modelcontextprotocol.kotlin.sdk.types.Method
26+
import io.modelcontextprotocol.kotlin.sdk.types.PrimingEventMessage
2627
import io.modelcontextprotocol.kotlin.sdk.types.RPCError
2728
import io.modelcontextprotocol.kotlin.sdk.types.RequestId
2829
import io.modelcontextprotocol.kotlin.sdk.types.SUPPORTED_PROTOCOL_VERSIONS
@@ -239,24 +240,6 @@ public class StreamableHttpServerTransport(
239240
}
240241
}
241242

242-
/**
243-
* Closes the SSE stream associated with the given [requestId], prompting the client to reconnect.
244-
* Useful for implementing polling behavior for long-running operations.
245-
*/
246-
public suspend fun closeSseStream(requestId: RequestId) {
247-
if (enableJsonResponse) return
248-
val streamId = requestToStreamMapping[requestId] ?: return
249-
val sessionContext = streamsMapping[streamId] ?: return
250-
251-
try {
252-
sessionContext.session?.close()
253-
} catch (e: Exception) {
254-
_onError(e)
255-
} finally {
256-
streamsMapping.remove(streamId)
257-
}
258-
}
259-
260243
override suspend fun close() {
261244
streamMutex.withLock {
262245
streamsMapping.values.forEach {
@@ -368,6 +351,7 @@ public class StreamableHttpServerTransport(
368351
if (!enableJsonResponse) {
369352
call.appendSseHeaders()
370353
flushSse(session) // flush headers immediately
354+
maybeSendPrimingEvent(streamId, session)
371355
}
372356

373357
streamMutex.withLock {
@@ -429,6 +413,7 @@ public class StreamableHttpServerTransport(
429413
call.appendSseHeaders()
430414
flushSse(session) // flush headers immediately
431415
streamsMapping[STANDALONE_SSE_STREAM_ID] = SessionContext(session, call)
416+
maybeSendPrimingEvent(STANDALONE_SSE_STREAM_ID, session)
432417
session.coroutineContext.job.invokeOnCompletion { streamsMapping.remove(STANDALONE_SSE_STREAM_ID) }
433418
}
434419

@@ -439,6 +424,24 @@ public class StreamableHttpServerTransport(
439424
call.respondNullable(status = HttpStatusCode.OK, message = null)
440425
}
441426

427+
/**
428+
* Closes the SSE stream associated with the given [requestId], prompting the client to reconnect.
429+
* Useful for implementing polling behavior for long-running operations.
430+
*/
431+
public suspend fun closeSseStream(requestId: RequestId) {
432+
if (enableJsonResponse) return
433+
val streamId = requestToStreamMapping[requestId] ?: return
434+
val sessionContext = streamsMapping[streamId] ?: return
435+
436+
try {
437+
sessionContext.session?.close()
438+
} catch (e: Exception) {
439+
_onError(e)
440+
} finally {
441+
streamsMapping.remove(streamId)
442+
}
443+
}
444+
442445
private suspend fun replayEvents(store: EventStore, lastEventId: String, session: ServerSSESession) {
443446
val call: ApplicationCall = session.call
444447

@@ -645,6 +648,17 @@ public class StreamableHttpServerTransport(
645648
}
646649
}
647650

651+
private suspend fun maybeSendPrimingEvent(streamId: String, session: ServerSSESession?) {
652+
val store = eventStore ?: return
653+
val sseSession = session ?: return
654+
try {
655+
val primingEventId = store.storeEvent(streamId, PrimingEventMessage)
656+
sseSession.send(id = primingEventId, retry = retryIntervalMillis, data = "")
657+
} catch (e: Exception) {
658+
_onError(e)
659+
}
660+
}
661+
648662
private fun ApplicationCall.appendSseHeaders() {
649663
this.response.headers.append(HttpHeaders.ContentType, ContentType.Text.EventStream.toString())
650664
this.response.headers.append(HttpHeaders.CacheControl, "no-cache, no-transform")

0 commit comments

Comments
 (0)