Skip to content

Commit 087cfae

Browse files
authored
fix: wait for agents in blink dev to stop streaming before restarting them (#52)
1 parent 6479d4e commit 087cfae

File tree

9 files changed

+223
-4
lines changed

9 files changed

+223
-4
lines changed

bun.lock

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
"@opentelemetry/sdk-trace-base": "^2.1.0",
4848
"@opentelemetry/sdk-trace-node": "^2.1.0",
4949
"@opentelemetry/semantic-conventions": "^1.37.0",
50+
"@rocicorp/lock": "^1.0.4",
5051
"@types/marked-terminal": "^6.1.1",
5152
"@types/react": "^19.2.2",
5253
"@types/ws": "^8.18.1",
@@ -710,6 +711,10 @@
710711

711712
"@radix-ui/react-use-layout-effect": ["@radix-ui/react-use-layout-effect@1.1.1", "", { "peerDependencies": { "@types/react": "*", "react": "^16.8 || ^17.0 || ^18.0 || ^19.0 || ^19.0.0-rc" }, "optionalPeers": ["@types/react"] }, "sha512-RbJRS4UWQFkzHTTwVymMTUv8EqYhOp8dOOviLj2ugtTiXRaRQS7GLGxZTLL1jWhMeoSCf5zmcZkqTl9IiYfXcQ=="],
712713

714+
"@rocicorp/lock": ["@rocicorp/lock@1.0.4", "", { "dependencies": { "@rocicorp/resolver": "^1.0.2" } }, "sha512-FavTiO8ETXFXDVfA87IThGduTTTR8iqzBnr/c60gUUmbk7knGEXPmf2B+yiNuluJD0ku0fL2V2r62UXnsLXl6w=="],
715+
716+
"@rocicorp/resolver": ["@rocicorp/resolver@1.0.2", "", {}, "sha512-TfjMTQp9cNNqNtHFfa+XHEGdA7NnmDRu+ZJH4YF3dso0Xk/b9DMhg/sl+b6CR4ThFZArXXDsG1j8Mwl34wcOZQ=="],
717+
713718
"@rolldown/binding-android-arm64": ["@rolldown/binding-android-arm64@1.0.0-beta.43", "", { "os": "android", "cpu": "arm64" }, "sha512-TP8bcPOb1s6UmY5syhXrDn9k0XkYcw+XaoylTN4cJxf0JOVS2j682I3aTcpfT51hOFGr2bRwNKN9RZ19XxeQbA=="],
714719

715720
"@rolldown/binding-darwin-arm64": ["@rolldown/binding-darwin-arm64@1.0.0-beta.43", "", { "os": "darwin", "cpu": "arm64" }, "sha512-kuVWnZsE4vEjMF/10SbSUyzucIW2zmdsqFghYMqy+fsjXnRHg0luTU6qWF8IqJf4Cbpm9NEZRnjIEPpAbdiSNQ=="],

packages/blink/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@
100100
"@opentelemetry/sdk-trace-base": "^2.1.0",
101101
"@opentelemetry/sdk-trace-node": "^2.1.0",
102102
"@opentelemetry/semantic-conventions": "^1.37.0",
103+
"@rocicorp/lock": "^1.0.4",
103104
"@types/marked-terminal": "^6.1.1",
104105
"@types/react": "^19.2.2",
105106
"@types/ws": "^8.18.1",

packages/blink/src/agent/client/index.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import { convertResponseToUIMessageStream } from "../internal/convert-response-t
1212
import type { ID } from "../types";
1313
import type { UIOptions, UIOptionsSchema } from "../ui";
1414
import { APIServerURLEnvironmentVariable } from "../constants";
15+
import { RWLock } from "../../local/rw-lock";
1516

1617
export { APIServerURLEnvironmentVariable };
1718

@@ -30,10 +31,12 @@ export type CapabilitiesResponse = Awaited<ReturnType<Client["capabilities"]>>;
3031
export class Client {
3132
public readonly baseUrl: string;
3233
private readonly client: ReturnType<typeof hc<typeof api>>;
34+
public readonly agentLock: RWLock;
3335

3436
public constructor(options: ClientOptions) {
3537
this.client = hc<typeof api>(options.baseUrl);
3638
this.baseUrl = options.baseUrl;
39+
this.agentLock = new RWLock();
3740
}
3841

3942
/**

packages/blink/src/local/chat-manager.test.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { join } from "node:path";
55
import type { UIMessage, UIMessageChunk } from "ai";
66
import { ChatManager, type ChatStatus } from "./chat-manager";
77
import { createDiskStore } from "./disk-store";
8+
import { RWLock } from "./rw-lock";
89
import type { StoredChat, StoredMessage } from "./types";
910
import type { Client } from "../agent/client";
1011

@@ -14,6 +15,7 @@ function createMockAgent(
1415
): Client & { chatCalls: any[] } {
1516
const chatCalls: any[] = [];
1617
return {
18+
agentLock: new RWLock(),
1719
chatCalls,
1820
chat: async ({ messages, signal }: any) => {
1921
chatCalls.push({ messages, signal });
@@ -69,6 +71,7 @@ function createMockAgent(
6971
// Helper to create a slow-streaming agent (yields control between chunks)
7072
function createSlowAgent(chunks: number = 5): Client {
7173
return {
74+
agentLock: new RWLock(),
7275
chat: async ({ signal }: any) => {
7376
const stream = new ReadableStream<UIMessageChunk>({
7477
async start(controller) {

packages/blink/src/local/chat-manager.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -383,6 +383,8 @@ export class ChatManager {
383383
};
384384
});
385385

386+
// Acquire read lock on agent to prevent it from being disposed while streaming.
387+
using _agentLock = await this.agent.agentLock.read();
386388
// Stream agent response
387389
const streamStartTime = performance.now();
388390
const stream = await runAgent({
@@ -541,7 +543,7 @@ export class ChatManager {
541543
this.disposed = true;
542544
this.watcher.dispose();
543545
this.listeners.clear();
544-
this.abortController?.abort();
546+
this.stopStreaming();
545547
}
546548

547549
private resetChatState(): void {
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
import { expect, test } from "bun:test";
2+
import { RWLock } from "./rw-lock";
3+
4+
// Note: The actual lock logic is handled by the @rocicorp/lock library.
5+
// These tests verify that our wrapper correctly implements Symbol.asyncDispose
6+
// and that basic read/write lock semantics work as expected.
7+
test("RWLock: write lock blocks readers", async () => {
8+
const lock = new RWLock();
9+
const events: string[] = [];
10+
11+
let writerAcquired: () => void = () => {
12+
throw new Error("writerAcquired not set");
13+
};
14+
let releaseWriter: () => void = () => {
15+
throw new Error("releaseWriter not set");
16+
};
17+
const writerHasAcquired = new Promise<void>((resolve) => {
18+
writerAcquired = resolve;
19+
});
20+
const writerCanRelease = new Promise<void>((resolve) => {
21+
releaseWriter = resolve;
22+
});
23+
24+
// Acquire write lock
25+
const writerPromise = (async () => {
26+
using writeLock = await lock.write();
27+
events.push("writer-acquired");
28+
writerAcquired();
29+
await writerCanRelease;
30+
// give a chance for a bug to happen. it shouldn't!
31+
await new Promise((resolve) => setTimeout(resolve, 10));
32+
events.push("writer-releasing");
33+
})();
34+
35+
// Wait for writer to actually acquire the lock
36+
await writerHasAcquired;
37+
38+
// Try to acquire read lock (should wait)
39+
const readerPromise = (async () => {
40+
events.push("reader-waiting");
41+
using readLock = await lock.read();
42+
events.push("reader-acquired");
43+
})();
44+
45+
releaseWriter();
46+
47+
await Promise.all([writerPromise, readerPromise]);
48+
49+
// Reader should wait for writer to release
50+
expect(events).toEqual([
51+
"writer-acquired",
52+
"reader-waiting",
53+
"writer-releasing",
54+
"reader-acquired",
55+
]);
56+
});
57+
58+
test("RWLock: readers block write lock", async () => {
59+
const lock = new RWLock();
60+
const events: string[] = [];
61+
62+
let readerAcquired: () => void = () => {
63+
throw new Error("readerAcquired not set");
64+
};
65+
let releaseReader: () => void = () => {
66+
throw new Error("releaseReader not set");
67+
};
68+
const readerHasAcquired = new Promise<void>((resolve) => {
69+
readerAcquired = resolve;
70+
});
71+
const readerCanRelease = new Promise<void>((resolve) => {
72+
releaseReader = resolve;
73+
});
74+
75+
// Acquire read lock
76+
const readerPromise = (async () => {
77+
using readLock = await lock.read();
78+
events.push("reader-acquired");
79+
readerAcquired();
80+
await readerCanRelease;
81+
// give a chance for a bug to happen. it shouldn't!
82+
await new Promise((resolve) => setTimeout(resolve, 10));
83+
events.push("reader-releasing");
84+
})();
85+
86+
// Wait for reader to actually acquire the lock
87+
await readerHasAcquired;
88+
89+
// Try to acquire write lock (should wait)
90+
const writerPromise = (async () => {
91+
events.push("writer-waiting");
92+
using writeLock = await lock.write();
93+
events.push("writer-acquired");
94+
})();
95+
96+
releaseReader();
97+
98+
await Promise.all([readerPromise, writerPromise]);
99+
100+
// Writer should wait for reader to release
101+
expect(events).toEqual([
102+
"reader-acquired",
103+
"writer-waiting",
104+
"reader-releasing",
105+
"writer-acquired",
106+
]);
107+
});
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
import { RWLock as RocicorpRWLock } from "@rocicorp/lock";
2+
3+
export class ReadLock {
4+
constructor(private onRelease: () => void) {}
5+
6+
[Symbol.dispose](): void {
7+
this.onRelease();
8+
}
9+
}
10+
11+
export class WriteLock {
12+
constructor(private onRelease: () => void) {}
13+
14+
[Symbol.dispose](): void {
15+
this.onRelease();
16+
}
17+
}
18+
19+
/**
20+
* RWLock is a read/write lock that allows multiple concurrent readers
21+
* or a single writer. Writers wait for all readers to finish before acquiring
22+
* the lock.
23+
*
24+
* This is designed for in-memory, single-process synchronization using the
25+
* explicit resource management pattern (Symbol.dispose).
26+
*
27+
* Example usage:
28+
* ```typescript
29+
* const lock = new RWLock();
30+
*
31+
* // Multiple readers can acquire the lock concurrently
32+
* using readLock = await lock.read();
33+
* // ... do read operations ...
34+
*
35+
* // Writers wait for all readers to finish
36+
* using writeLock = await lock.write();
37+
* // ... do write operations ...
38+
* ```
39+
*/
40+
export class RWLock {
41+
private _lock = new RocicorpRWLock();
42+
43+
/**
44+
* Acquire a read lock. Multiple readers can hold the lock concurrently.
45+
* If a writer is waiting, new readers will wait until the writer completes.
46+
*
47+
* The lock is automatically released when the returned object is disposed.
48+
*/
49+
async read(): Promise<ReadLock> {
50+
const release = await this._lock.read();
51+
return new ReadLock(release);
52+
}
53+
54+
/**
55+
* Acquire a write lock. Waits for all readers to finish before acquiring.
56+
* Only one writer can hold the lock at a time.
57+
*
58+
* The lock is automatically released when the returned object is disposed.
59+
*/
60+
async write(): Promise<WriteLock> {
61+
const release = await this._lock.write();
62+
return new WriteLock(release);
63+
}
64+
}

packages/blink/src/react/use-agent.ts

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import {
77
type CapabilitiesResponse,
88
APIServerURLEnvironmentVariable,
99
} from "../agent/client";
10+
import { RWLock } from "../local/rw-lock";
1011

1112
export interface AgentLog {
1213
readonly level: "log" | "error";
@@ -53,6 +54,8 @@ export default function useAgent(options: UseAgentOptions) {
5354
setAgent(undefined);
5455
setCapabilities(undefined);
5556

57+
let lock: RWLock | undefined;
58+
5659
(async () => {
5760
const port = await getRandomPort();
5861
const proc = spawn("node", ["--no-deprecation", buildResult.entry], {
@@ -64,11 +67,27 @@ export default function useAgent(options: UseAgentOptions) {
6467
PORT: port.toString(),
6568
HOST: "127.0.0.1",
6669
},
70+
// keep the child process tied to the parent process
71+
detached: false,
6772
});
68-
controller.signal.addEventListener("abort", () => {
73+
const cleanup = () => {
6974
try {
7075
proc.kill();
7176
} catch {}
77+
};
78+
79+
// Clean up - when the parent process exits, kill the child process.
80+
process.once("exit", cleanup);
81+
process.once("SIGINT", cleanup);
82+
process.once("SIGTERM", cleanup);
83+
process.once("uncaughtException", cleanup);
84+
85+
controller.signal.addEventListener("abort", () => {
86+
process.off("exit", cleanup);
87+
process.off("SIGINT", cleanup);
88+
process.off("SIGTERM", cleanup);
89+
process.off("uncaughtException", cleanup);
90+
cleanup();
7291
});
7392
let ready = false;
7493
proc.stdout.on("data", (data) => {
@@ -114,6 +133,7 @@ export default function useAgent(options: UseAgentOptions) {
114133
const client = new Client({
115134
baseUrl: `http://127.0.0.1:${port}`,
116135
});
136+
lock = client.agentLock;
117137
// Wait for the health endpoint to be alive.
118138
while (!controller.signal.aborted) {
119139
try {
@@ -139,7 +159,12 @@ export default function useAgent(options: UseAgentOptions) {
139159
});
140160
return () => {
141161
isCleanup = true;
142-
controller.abort();
162+
(async () => {
163+
// Acquire write lock before cleaning up this agent instance
164+
// This waits for any active streams using this agent to complete
165+
using _writeLock = await lock?.write();
166+
controller.abort();
167+
})();
143168
};
144169
}, [buildResult, env, apiServerUrl]);
145170

packages/blink/src/react/use-edit-agent.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import {
66
getEditModeModel,
77
type EditAgent,
88
} from "../edit/agent";
9+
import { RWLock } from "../local/rw-lock";
910

1011
export interface UseEditAgentOptions {
1112
readonly directory: string;
@@ -35,6 +36,8 @@ export default function useEditAgent(options: UseEditAgentOptions) {
3536

3637
setMissingApiKey(false);
3738

39+
let lock: RWLock | undefined;
40+
3841
(async () => {
3942
// Create the edit agent
4043
editAgentRef.current = createEditAgent({
@@ -64,6 +67,7 @@ export default function useEditAgent(options: UseEditAgentOptions) {
6467
const editClient = new Client({
6568
baseUrl: `http://127.0.0.1:${port}`,
6669
});
70+
lock = editClient.agentLock;
6771

6872
// Wait for health check
6973
while (!controller.signal.aborted) {
@@ -88,7 +92,12 @@ export default function useEditAgent(options: UseEditAgentOptions) {
8892

8993
return () => {
9094
isCleanup = true;
91-
controller.abort();
95+
(async () => {
96+
// Acquire write lock before cleaning up this edit agent instance
97+
// This waits for any active streams using this agent to complete
98+
using _writeLock = await lock?.write();
99+
controller.abort();
100+
})();
92101
};
93102
}, [
94103
options.directory,

0 commit comments

Comments
 (0)