@@ -64,9 +64,136 @@ def _init_session_yroom(self, session_id: str, path: str) -> YRoom:
6464 room_id = f"json:notebook:{ file_id } "
6565 yroom = self .yroom_manager .get_room (room_id )
6666 self ._room_ids [session_id ] = room_id
67-
6867 return yroom
6968
69+ async def _ensure_yroom_connected (self , session_id : str , kernel_id : str ) -> None :
70+ """
71+ Ensures that a session's yroom is connected to its kernel client.
72+
73+ This method is critical for maintaining the connection between collaborative
74+ document state (yroom) and kernel execution state. It handles scenarios where
75+ the yroom-kernel connection may have been lost, such as:
76+
77+ - Server restarts where sessions persist but in-memory connections are lost
78+ - Remote/persistent kernels that survive across server lifecycles
79+ - Recovery from transient failures or race conditions during session setup
80+
81+ The method is idempotent - it checks if the yroom is already connected before
82+ attempting to add it, preventing duplicate connections.
83+
84+ Args:
85+ session_id: The unique identifier for the session
86+ kernel_id: The unique identifier for the kernel
87+
88+ Note:
89+ This method silently handles cases where the yroom or kernel don't exist,
90+ or where the session has no associated yroom. Failures are logged but
91+ don't raise exceptions.
92+ """
93+ # Check if this session has an associated yroom in the cache
94+ room_id = self ._room_ids .get (session_id )
95+
96+ # If not cached, populate it from the session's path
97+ # This handles persistent sessions that survive server restarts
98+ if not room_id :
99+ try :
100+ # Get the session from the database to find its path
101+ # Use super() to avoid infinite recursion since we're called from get_session
102+ session = await super ().get_session (session_id = session_id )
103+ if session and session .get ("type" ) == "notebook" :
104+ path = session .get ("path" )
105+ if path :
106+ # Use the same logic as _init_session_yroom to calculate room_id
107+ file_id = self .file_id_manager .index (path )
108+ room_id = f"json:notebook:{ file_id } "
109+ # Cache it for future calls
110+ self ._room_ids [session_id ] = room_id
111+ self .log .debug (f"Populated room_id { room_id } from session path for session { session_id } " )
112+ else :
113+ self .log .debug (f"Session { session_id } has no path" )
114+ return
115+ else :
116+ self .log .debug (f"Session { session_id } is not a notebook type" )
117+ return
118+ except Exception as e :
119+ self .log .warning (f"Failed to lookup session { session_id } : { e } " )
120+ return
121+
122+ if not room_id :
123+ # Session has no yroom (e.g., console session or non-notebook type)
124+ return
125+
126+ # Get the yroom if it exists
127+ yroom = self .yroom_manager .get_room (room_id )
128+ if not yroom :
129+ # Room doesn't exist yet or was cleaned up
130+ return
131+
132+ # Ensure the yroom is added to the kernel client
133+ try :
134+ kernel_manager = self .serverapp .kernel_manager .get_kernel (kernel_id )
135+ kernel_client = kernel_manager .kernel_client
136+
137+ # Check if yroom is already connected to avoid duplicate connections
138+ if hasattr (kernel_client , '_yrooms' ) and yroom not in kernel_client ._yrooms :
139+ await kernel_client .add_yroom (yroom )
140+ self .log .info (
141+ f"Reconnected yroom { room_id } to kernel_client for session { session_id } . "
142+ f"This ensures kernel messages are routed to the collaborative document."
143+ )
144+ except Exception as e :
145+ # Log but don't fail - the session is still valid even if yroom connection fails
146+ self .log .warning (
147+ f"Failed to connect yroom to kernel_client for session { session_id } : { e } "
148+ )
149+
150+ async def get_session (self , ** kwargs ) -> Optional [dict [str , Any ]]:
151+ """
152+ Retrieves a session and ensures the yroom-kernel connection is established.
153+
154+ This override of the parent's get_session() adds a critical step: verifying
155+ and restoring the connection between the session's yroom (collaborative state)
156+ and its kernel client (execution engine).
157+
158+ Why this matters:
159+ - When reconnecting to persistent/remote kernels, the in-memory yroom connection
160+ may not exist even though both the session and kernel are valid
161+ - Server restarts can break yroom-kernel connections while sessions persist
162+ - This ensures that every time a session is retrieved, it's fully functional
163+ for collaborative notebook editing and execution
164+
165+ Args:
166+ **kwargs: Arguments passed to the parent's get_session() method
167+ (e.g., session_id, path, kernel_id)
168+
169+ Returns:
170+ The session model dict, or None if no session is found
171+ """
172+ session = await super ().get_session (** kwargs )
173+
174+ # If no session found, return None
175+ if session is None :
176+ return None
177+
178+ # Extract session and kernel information
179+ session_id = session .get ("id" )
180+ kernel_info = session .get ("kernel" )
181+
182+ # Only process sessions with valid kernel and session ID
183+ if not kernel_info or not session_id :
184+ return session
185+
186+ kernel_id = kernel_info .get ("id" )
187+ if not kernel_id :
188+ return session
189+
190+ # Ensure the yroom is connected to the kernel client
191+ # This is especially important for persistent kernels that survive server restarts
192+ await self ._ensure_yroom_connected (session_id , kernel_id )
193+
194+ return session
195+
196+
70197 async def create_session (
71198 self ,
72199 path : Optional [str ] = None ,
@@ -103,7 +230,7 @@ async def create_session(
103230 awareness .set_local_state_field (
104231 "kernel" , {"execution_state" : "starting" }
105232 )
106-
233+
107234 # Now create the session and start the kernel
108235 session_model = await super ().create_session (
109236 path ,
@@ -129,16 +256,14 @@ async def create_session(
129256 self .log .warning (f"`name` or `path` was not given for new session at '{ path } '." )
130257 return session_model
131258
132- # Otherwise, add the YRoom to this session's kernel client.
133-
134259 # Store the room ID for this session
135260 if yroom :
136261 self ._room_ids [session_id ] = yroom .room_id
137262 else :
138263 # Shouldn't happen, but handle it anyway
139264 real_path = os .path .join (os .path .split (path )[0 ], name )
140265 yroom = self ._init_session_yroom (session_id , real_path )
141-
266+
142267 # Add YRoom to this session's kernel client
143268 # Ensure the kernel client is fully connected before proceeding
144269 # to avoid queuing messages on first execution
@@ -148,7 +273,6 @@ async def create_session(
148273 self .log .info (f"Connected yroom { yroom .room_id } to kernel { kernel_id } . yroom: { yroom } " )
149274 return session_model
150275
151-
152276 async def update_session (self , session_id : str , ** update ) -> None :
153277 """
154278 Updates the session identified by `session_id` using the keyword
0 commit comments