1+ import { createServer , type Server } from 'node:http' ;
2+ import { AddressInfo } from 'node:net' ;
3+ import { randomUUID } from 'node:crypto' ;
4+ import { Client } from '../client/index.js' ;
5+ import { StreamableHTTPClientTransport } from '../client/streamableHttp.js' ;
6+ import { McpServer } from '../server/mcp.js' ;
7+ import { EventStore , StreamableHTTPServerTransport } from '../server/streamableHttp.js' ;
8+ import { CallToolResult , CallToolResultSchema , JSONRPCMessage , LoggingMessageNotificationSchema } from '../types.js' ;
9+ import { z } from 'zod' ;
10+
11+ /**
12+ * Simple in-memory event store implementation for resumability
13+ */
14+ class InMemoryEventStore implements EventStore {
15+ private events : Map < string , { streamId : string , message : JSONRPCMessage } > = new Map ( ) ;
16+
17+ generateEventId ( streamId : string ) : string {
18+ return `${ streamId } _${ Date . now ( ) } _${ Math . random ( ) . toString ( 36 ) . substring ( 2 , 10 ) } ` ;
19+ }
20+
21+ getStreamIdFromEventId ( eventId : string ) : string {
22+ const parts = eventId . split ( '_' ) ;
23+ return parts . length > 0 ? parts [ 0 ] : '' ;
24+ }
25+
26+ async storeEvent ( streamId : string , message : JSONRPCMessage ) : Promise < string > {
27+ const eventId = this . generateEventId ( streamId ) ;
28+ this . events . set ( eventId , { streamId, message } ) ;
29+ return eventId ;
30+ }
31+
32+ async getEventsAfter ( lastEventId : string ) : Promise < Array < { eventId : string , message : JSONRPCMessage } > > {
33+ if ( ! lastEventId || ! this . events . has ( lastEventId ) ) {
34+ return [ ] ;
35+ }
36+
37+ // Extract the stream ID from the event ID
38+ const streamId = this . getStreamIdFromEventId ( lastEventId ) ;
39+ const result : Array < { eventId : string , message : JSONRPCMessage } > = [ ] ;
40+ let foundLastEvent = false ;
41+
42+ // Sort events by eventId for chronological ordering
43+ const sortedEvents = [ ...this . events . entries ( ) ] . sort ( ( a , b ) => a [ 0 ] . localeCompare ( b [ 0 ] ) ) ;
44+
45+ for ( const [ eventId , { streamId : eventStreamId , message } ] of sortedEvents ) {
46+ // Only include events from the same stream
47+ if ( eventStreamId !== streamId ) {
48+ continue ;
49+ }
50+
51+ // Start collecting events after we find the lastEventId
52+ if ( eventId === lastEventId ) {
53+ foundLastEvent = true ;
54+ continue ;
55+ }
56+
57+ if ( foundLastEvent ) {
58+ result . push ( { eventId, message } ) ;
59+ }
60+ }
61+
62+ return result ;
63+ }
64+ }
65+
66+
67+ describe ( 'Transport resumability' , ( ) => {
68+ let server : Server ;
69+ let mcpServer : McpServer ;
70+ let serverTransport : StreamableHTTPServerTransport ;
71+ let baseUrl : URL ;
72+ let eventStore : InMemoryEventStore ;
73+
74+ beforeEach ( async ( ) => {
75+ // Create event store for resumability
76+ eventStore = new InMemoryEventStore ( ) ;
77+
78+ // Create a simple MCP server
79+ mcpServer = new McpServer (
80+ { name : 'test-server' , version : '1.0.0' } ,
81+ { capabilities : { logging : { } } }
82+ ) ;
83+
84+ // Add a simple notification tool that completes quickly
85+ mcpServer . tool (
86+ 'send-notification' ,
87+ 'Sends a single notification' ,
88+ {
89+ message : z . string ( ) . describe ( 'Message to send' ) . default ( 'Test notification' )
90+ } ,
91+ async ( { message } , { sendNotification } ) => {
92+ // Send notification immediately
93+ await sendNotification ( {
94+ method : "notifications/message" ,
95+ params : {
96+ level : "info" ,
97+ data : message
98+ }
99+ } ) ;
100+
101+ return {
102+ content : [ { type : 'text' , text : 'Notification sent' } ]
103+ } ;
104+ }
105+ ) ;
106+
107+ // Add a long-running tool that sends multiple notifications
108+ mcpServer . tool (
109+ 'run-notifications' ,
110+ 'Sends multiple notifications over time' ,
111+ {
112+ count : z . number ( ) . describe ( 'Number of notifications to send' ) . default ( 10 ) ,
113+ interval : z . number ( ) . describe ( 'Interval between notifications in ms' ) . default ( 50 )
114+ } ,
115+ async ( { count, interval } , { sendNotification } ) => {
116+ // Send notifications at specified intervals
117+ for ( let i = 0 ; i < count ; i ++ ) {
118+ await sendNotification ( {
119+ method : "notifications/message" ,
120+ params : {
121+ level : "info" ,
122+ data : `Notification ${ i + 1 } of ${ count } `
123+ }
124+ } ) ;
125+
126+ // Wait for the specified interval before sending next notification
127+ if ( i < count - 1 ) {
128+ await new Promise ( resolve => setTimeout ( resolve , interval ) ) ;
129+ }
130+ }
131+
132+ return {
133+ content : [ { type : 'text' , text : `Sent ${ count } notifications` } ]
134+ } ;
135+ }
136+ ) ;
137+
138+ // Create a transport with the event store
139+ serverTransport = new StreamableHTTPServerTransport ( {
140+ sessionIdGenerator : ( ) => randomUUID ( ) ,
141+ eventStore
142+ } ) ;
143+
144+ // Connect the transport to the MCP server
145+ await mcpServer . connect ( serverTransport ) ;
146+
147+ // Create and start an HTTP server
148+ server = createServer ( async ( req , res ) => {
149+ await serverTransport . handleRequest ( req , res ) ;
150+ } ) ;
151+
152+ // Start the server on a random port
153+ baseUrl = await new Promise < URL > ( ( resolve ) => {
154+ server . listen ( 0 , '127.0.0.1' , ( ) => {
155+ const addr = server . address ( ) as AddressInfo ;
156+ resolve ( new URL ( `http://127.0.0.1:${ addr . port } ` ) ) ;
157+ } ) ;
158+ } ) ;
159+ } ) ;
160+
161+ afterEach ( async ( ) => {
162+ // Clean up resources
163+ await mcpServer . close ( ) . catch ( ( ) => { } ) ;
164+ await serverTransport . close ( ) . catch ( ( ) => { } ) ;
165+ server . close ( ) ;
166+ } ) ;
167+
168+ it ( 'should store session ID when client connects' , async ( ) => {
169+ // Create and connect a client
170+ const client = new Client ( {
171+ name : 'test-client' ,
172+ version : '1.0.0'
173+ } ) ;
174+
175+ const transport = new StreamableHTTPClientTransport ( baseUrl ) ;
176+ await client . connect ( transport ) ;
177+
178+ // Verify session ID was generated
179+ expect ( transport . sessionId ) . toBeDefined ( ) ;
180+
181+ // Clean up
182+ await transport . close ( ) ;
183+ } ) ;
184+
185+ it ( 'should have session ID functionality' , async ( ) => {
186+ // The ability to store a session ID when connecting
187+ const client = new Client ( {
188+ name : 'test-client-reconnection' ,
189+ version : '1.0.0'
190+ } ) ;
191+
192+ const transport = new StreamableHTTPClientTransport ( baseUrl ) ;
193+
194+ // Make sure the client can connect and get a session ID
195+ await client . connect ( transport ) ;
196+ expect ( transport . sessionId ) . toBeDefined ( ) ;
197+
198+ // Clean up
199+ await transport . close ( ) ;
200+ } ) ;
201+
202+ // This test demonstrates the capability to resume long-running tools
203+ // across client disconnection/reconnection
204+ it ( 'should resume long-running notifications with lastEventId' , async ( ) => {
205+ // Create unique client ID for this test
206+ const clientId = 'test-client-long-running' ;
207+ const notifications : any [ ] = [ ] ;
208+ let sessionId : string | undefined ;
209+ let lastEventId : string | undefined ;
210+
211+ // Create first client
212+ let client1 = new Client ( {
213+ id : clientId ,
214+ name : 'test-client' ,
215+ version : '1.0.0'
216+ } ) ;
217+
218+ // Set up notification handler for first client
219+ client1 . setNotificationHandler ( LoggingMessageNotificationSchema , ( notification : any ) => {
220+ if ( notification . method === 'notifications/message' ) {
221+ notifications . push ( notification . params ) ;
222+ }
223+ } ) ;
224+
225+ // Connect first client
226+ const transport1 = new StreamableHTTPClientTransport ( baseUrl ) ;
227+ await client1 . connect ( transport1 ) ;
228+ sessionId = transport1 . sessionId ;
229+ expect ( sessionId ) . toBeDefined ( ) ;
230+
231+ // Start a long-running notification stream with tracking of lastEventId
232+ const onLastEventIdUpdate = jest . fn ( ( eventId : string ) => {
233+ lastEventId = eventId ;
234+ } ) ;
235+
236+ // Start the notification tool with event tracking using request
237+ const toolPromise = client1 . request ( {
238+ method : 'tools/call' ,
239+ params : {
240+ name : 'run-notifications' ,
241+ arguments : {
242+ count : 5 ,
243+ interval : 10
244+ }
245+ }
246+ } , CallToolResultSchema , {
247+ lastEventId,
248+ onLastEventIdUpdate
249+ } ) ;
250+
251+ // Wait for some notifications to arrive (not all)
252+ await new Promise ( resolve => setTimeout ( resolve , 20 ) ) ;
253+
254+ // Verify we received some notifications and lastEventId was updated
255+ expect ( notifications . length ) . toBeGreaterThan ( 0 ) ;
256+ expect ( notifications . length ) . toBeLessThan ( 5 ) ;
257+ expect ( onLastEventIdUpdate ) . toHaveBeenCalled ( ) ;
258+ expect ( lastEventId ) . toBeDefined ( ) ;
259+
260+ // Store original notification count for later comparison
261+ const firstClientNotificationCount = notifications . length ;
262+
263+ // Disconnect first client without waiting for completion
264+ // When we close the connection, it will cause a ConnectionClosed error for
265+ // any in-progress requests, which is expected behavior
266+ // We need to catch the error since closing the transport will
267+ // cause the pending toolPromise to reject with a ConnectionClosed error
268+ await transport1 . close ( ) ;
269+
270+ // Try to cancel the promise, but ignore errors since it's already being handled
271+ toolPromise . catch ( err => {
272+ // This error is expected - the connection was intentionally closed
273+ if ( err ?. code !== - 32000 ) { // ConnectionClosed error code
274+ console . error ( "Unexpected error type during transport close:" , err ) ;
275+ }
276+ } ) ;
277+
278+
279+ // Create second client with same client ID
280+ const client2 = new Client ( {
281+ id : clientId ,
282+ name : 'test-client' ,
283+ version : '1.0.0'
284+ } ) ;
285+
286+ // Set up notification handler for second client
287+ client2 . setNotificationHandler ( LoggingMessageNotificationSchema , ( notification : any ) => {
288+ if ( notification . method === 'notifications/message' ) {
289+ notifications . push ( notification . params ) ;
290+ }
291+ } ) ;
292+
293+ // Connect second client with same session ID
294+ const transport2 = new StreamableHTTPClientTransport ( baseUrl , {
295+ sessionId
296+ } ) ;
297+ await client2 . connect ( transport2 ) ;
298+
299+ // Resume the notification stream using lastEventId
300+ // This is the key part - we're resuming the same long-running tool using lastEventId
301+ const resumedToolPromise = client2 . request ( {
302+ method : 'tools/call' ,
303+ params : {
304+ name : 'run-notifications' ,
305+ arguments : {
306+ count : 5 ,
307+ interval : 50
308+ }
309+ }
310+ } , CallToolResultSchema , {
311+ lastEventId, // Pass the lastEventId from the previous session
312+ onLastEventIdUpdate
313+ } ) ;
314+
315+ // Wait for remaining notifications
316+ await new Promise ( resolve => setTimeout ( resolve , 200 ) ) ;
317+
318+ // Verify we eventually received at leaset a few motifications
319+ expect ( notifications . length ) . toBeGreaterThan ( 2 ) ;
320+
321+ // Verify the second client received notifications that the first client didn't
322+ expect ( notifications . length ) . toBeGreaterThan ( firstClientNotificationCount ) ;
323+
324+ // Clean up
325+
326+ await transport2 . close ( ) ;
327+
328+ } ) ;
329+ } ) ;
0 commit comments