Skip to content

Commit 3e62bf5

Browse files
committed
Added getStreamIdForEventId to EventStore and updated StreamableHttpServerTransport for stream ID validation during event replay.
1 parent 9a68b8c commit 3e62bf5

File tree

2 files changed

+39
-0
lines changed

2 files changed

+39
-0
lines changed

kotlin-sdk-server/api/kotlin-sdk-server.api

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
public abstract interface class io/modelcontextprotocol/kotlin/sdk/server/EventStore {
2+
public abstract fun getStreamIdForEventId (Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
23
public abstract fun replayEventsAfter (Ljava/lang/String;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
34
public abstract fun storeEvent (Ljava/lang/String;Lio/modelcontextprotocol/kotlin/sdk/types/JSONRPCMessage;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
45
}

kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,12 @@ public interface EventStore {
6464
lastEventId: String,
6565
sender: suspend (eventId: String, message: JSONRPCMessage) -> Unit,
6666
): String
67+
68+
/**
69+
* Returns the stream ID associated with [eventId], or null if the event is unknown.
70+
* Default implementation is a no-op which disables extra validation during replay.
71+
*/
72+
public suspend fun getStreamIdForEventId(eventId: String): String?
6773
}
6874

6975
/**
@@ -416,6 +422,38 @@ public class StreamableHttpServerTransport(
416422
val call: ApplicationCall = session.call
417423

418424
try {
425+
var lookupSupported = true
426+
val lookupStreamId = try {
427+
store.getStreamIdForEventId(lastEventId)
428+
} catch (_: NotImplementedError) {
429+
lookupSupported = false
430+
null
431+
} catch (_: UnsupportedOperationException) {
432+
lookupSupported = false
433+
null
434+
}
435+
436+
if (lookupSupported) {
437+
val streamId = lookupStreamId
438+
?: run {
439+
call.reject(
440+
HttpStatusCode.BadRequest,
441+
RPCError.ErrorCode.CONNECTION_CLOSED,
442+
"Invalid event ID format",
443+
)
444+
return
445+
}
446+
447+
if (streamId in streamsMapping) {
448+
call.reject(
449+
HttpStatusCode.Conflict,
450+
RPCError.ErrorCode.CONNECTION_CLOSED,
451+
"Conflict: Stream already has an active connection",
452+
)
453+
return
454+
}
455+
}
456+
419457
call.appendSseHeaders()
420458
session.send(data = "") // flush headers immediately
421459

0 commit comments

Comments
 (0)