Skip to content

Commit e8600ca

Browse files
committed
Added retryIntervalMillis to StreamableHttpServerTransport for SSE reconnection hints and refactored SSE flush logic into flushSse function.
1 parent 99a030f commit e8600ca

File tree

1 file changed

+14
-3
lines changed

1 file changed

+14
-3
lines changed

kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,8 @@ private data class SessionContext(val session: ServerSSESession?, val call: Appl
105105
* If not specified, origin validation is disabled.
106106
* @param eventStore Event store for resumability support
107107
* If provided, resumability will be enabled, allowing clients to reconnect and resume messages
108+
* @param retryIntervalMillis Retry interval (in milliseconds) advertised via SSE priming events to hint the client when to reconnect.
109+
* Applies only when an [eventStore] is configured. Defaults to `null` (no retry hint).
108110
*/
109111
@OptIn(ExperimentalUuidApi::class, ExperimentalAtomicApi::class)
110112
public class StreamableHttpServerTransport(
@@ -113,6 +115,7 @@ public class StreamableHttpServerTransport(
113115
private val allowedHosts: List<String>? = null,
114116
private val allowedOrigins: List<String>? = null,
115117
private val eventStore: EventStore? = null,
118+
private val retryIntervalMillis: Long? = null,
116119
) : AbstractTransport() {
117120
public var sessionId: String? = null
118121
private set
@@ -346,7 +349,7 @@ public class StreamableHttpServerTransport(
346349
val streamId = Uuid.random().toString()
347350
if (!enableJsonResponse) {
348351
call.appendSseHeaders()
349-
session?.send(data = "") // flush headers immediately
352+
flushSse(session) // flush headers immediately
350353
}
351354

352355
streamMutex.withLock {
@@ -406,7 +409,7 @@ public class StreamableHttpServerTransport(
406409
}
407410

408411
call.appendSseHeaders()
409-
session.send(data = "") // flush headers immediately
412+
flushSse(session) // flush headers immediately
410413
streamsMapping[STANDALONE_SSE_STREAM_ID] = SessionContext(session, call)
411414
session.coroutineContext.job.invokeOnCompletion { streamsMapping.remove(STANDALONE_SSE_STREAM_ID) }
412415
}
@@ -455,7 +458,7 @@ public class StreamableHttpServerTransport(
455458
}
456459

457460
call.appendSseHeaders()
458-
session.send(data = "") // flush headers immediately
461+
flushSse(session) // flush headers immediately
459462

460463
val streamId = store.replayEventsAfter(lastEventId) { eventId, message ->
461464
try {
@@ -562,6 +565,14 @@ public class StreamableHttpServerTransport(
562565
return null
563566
}
564567

568+
private suspend fun flushSse(session: ServerSSESession?) {
569+
try {
570+
session?.send(data = "")
571+
} catch (e: Exception) {
572+
_onError(e)
573+
}
574+
}
575+
565576
private suspend fun parseBody(call: ApplicationCall): List<JSONRPCMessage>? {
566577
val contentLength = call.request.header(HttpHeaders.ContentLength)?.toIntOrNull() ?: 0
567578
if (contentLength > MAXIMUM_MESSAGE_SIZE) {

0 commit comments

Comments
 (0)