Skip to content

Commit 6029a41

Browse files
authored
chore: remove the agentLock field from blink client (#71)
1 parent 025192f commit 6029a41

File tree

9 files changed

+130
-108
lines changed

9 files changed

+130
-108
lines changed

packages/blink/src/agent/client/index.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,10 @@ export type CapabilitiesResponse = Awaited<ReturnType<Client["capabilities"]>>;
3131
export class Client {
3232
public readonly baseUrl: string;
3333
private readonly client: ReturnType<typeof hc<typeof api>>;
34-
public readonly agentLock: RWLock;
3534

3635
public constructor(options: ClientOptions) {
3736
this.client = hc<typeof api>(options.baseUrl);
3837
this.baseUrl = options.baseUrl;
39-
this.agentLock = new RWLock();
4038
}
4139

4240
/**

packages/blink/src/cli/run.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import { resolveConfig } from "../build";
99
import { findNearestEntry } from "../build/util";
1010
import { existsSync } from "node:fs";
1111
import type { ID } from "../agent/types";
12+
import { RWLock } from "../local/rw-lock";
1213

1314
export default async function run(
1415
message: string[],
@@ -71,7 +72,7 @@ export default async function run(
7172
console.error("Error:", error);
7273
},
7374
});
74-
manager.setAgent(agent.client);
75+
manager.setAgent({ client: agent.client, lock: new RWLock() });
7576

7677
try {
7778
// Wait for completion by subscribing to state changes

packages/blink/src/local/chat-manager.test.ts

Lines changed: 90 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -10,99 +10,45 @@ import type { StoredChat, StoredMessage } from "./types";
1010
import type { Client } from "../agent/client";
1111

1212
// Helper to create a mock agent
13-
function createMockAgent(
14-
responseText: string = "Assistant response"
15-
): Client & { chatCalls: any[] } {
13+
function createMockAgent(responseText: string = "Assistant response"): {
14+
lock: RWLock;
15+
client: Client;
16+
chatCalls: any[];
17+
} {
1618
const chatCalls: any[] = [];
1719
return {
18-
agentLock: new RWLock(),
20+
lock: new RWLock(),
1921
chatCalls,
20-
chat: async ({ messages, signal }: any) => {
21-
chatCalls.push({ messages, signal });
22+
client: {
23+
chat: async ({ messages, signal }: any) => {
24+
chatCalls.push({ messages, signal });
2225

23-
// Return a ReadableStream of UIMessageChunk objects
24-
const stream = new ReadableStream<UIMessageChunk>({
25-
async start(controller) {
26-
if (signal?.aborted) {
27-
controller.close();
28-
return;
29-
}
30-
31-
// Start the message
32-
controller.enqueue({
33-
type: "start",
34-
messageId: "msg-1",
35-
} as UIMessageChunk);
36-
37-
// Add text content
38-
controller.enqueue({
39-
type: "text-start",
40-
id: "text-1",
41-
} as UIMessageChunk);
42-
43-
// Send text
44-
controller.enqueue({
45-
type: "text-delta",
46-
id: "text-1",
47-
delta: responseText,
48-
} as UIMessageChunk);
49-
50-
if (!signal?.aborted) {
51-
controller.enqueue({
52-
type: "text-end",
53-
id: "text-1",
54-
} as UIMessageChunk);
55-
56-
controller.enqueue({
57-
type: "finish",
58-
finishReason: "stop",
59-
usage: { promptTokens: 10, completionTokens: 5 },
60-
} as UIMessageChunk);
61-
}
62-
controller.close();
63-
},
64-
});
65-
66-
return stream;
67-
},
68-
} as any;
69-
}
70-
71-
// Helper to create a slow-streaming agent (yields control between chunks)
72-
function createSlowAgent(chunks: number = 5): Client {
73-
return {
74-
agentLock: new RWLock(),
75-
chat: async ({ signal }: any) => {
76-
const stream = new ReadableStream<UIMessageChunk>({
77-
async start(controller) {
78-
try {
26+
// Return a ReadableStream of UIMessageChunk objects
27+
const stream = new ReadableStream<UIMessageChunk>({
28+
async start(controller) {
7929
if (signal?.aborted) {
8030
controller.close();
8131
return;
8232
}
8333

34+
// Start the message
8435
controller.enqueue({
8536
type: "start",
8637
messageId: "msg-1",
8738
} as UIMessageChunk);
8839

40+
// Add text content
8941
controller.enqueue({
9042
type: "text-start",
9143
id: "text-1",
9244
} as UIMessageChunk);
9345

94-
for (let i = 0; i < chunks; i++) {
95-
if (signal?.aborted) {
96-
throw new Error("AbortError");
97-
}
98-
controller.enqueue({
99-
type: "text-delta",
100-
id: "text-1",
101-
delta: `chunk${i}`,
102-
} as UIMessageChunk);
103-
// Yield control to allow other operations
104-
await new Promise((resolve) => setImmediate(resolve));
105-
}
46+
// Send text
47+
controller.enqueue({
48+
type: "text-delta",
49+
id: "text-1",
50+
delta: responseText,
51+
} as UIMessageChunk);
10652

10753
if (!signal?.aborted) {
10854
controller.enqueue({
@@ -117,18 +63,78 @@ function createSlowAgent(chunks: number = 5): Client {
11763
} as UIMessageChunk);
11864
}
11965
controller.close();
120-
} catch (err: any) {
121-
if (err.message === "AbortError" || signal?.aborted) {
66+
},
67+
});
68+
69+
return stream;
70+
},
71+
} as any,
72+
};
73+
}
74+
75+
// Helper to create a slow-streaming agent (yields control between chunks)
76+
function createSlowAgent(chunks: number = 5): { client: Client; lock: RWLock } {
77+
return {
78+
lock: new RWLock(),
79+
client: {
80+
chat: async ({ signal }: any) => {
81+
const stream = new ReadableStream<UIMessageChunk>({
82+
async start(controller) {
83+
try {
84+
if (signal?.aborted) {
85+
controller.close();
86+
return;
87+
}
88+
89+
controller.enqueue({
90+
type: "start",
91+
messageId: "msg-1",
92+
} as UIMessageChunk);
93+
94+
controller.enqueue({
95+
type: "text-start",
96+
id: "text-1",
97+
} as UIMessageChunk);
98+
99+
for (let i = 0; i < chunks; i++) {
100+
if (signal?.aborted) {
101+
throw new Error("AbortError");
102+
}
103+
controller.enqueue({
104+
type: "text-delta",
105+
id: "text-1",
106+
delta: `chunk${i}`,
107+
} as UIMessageChunk);
108+
// Yield control to allow other operations
109+
await new Promise((resolve) => setImmediate(resolve));
110+
}
111+
112+
if (!signal?.aborted) {
113+
controller.enqueue({
114+
type: "text-end",
115+
id: "text-1",
116+
} as UIMessageChunk);
117+
118+
controller.enqueue({
119+
type: "finish",
120+
finishReason: "stop",
121+
usage: { promptTokens: 10, completionTokens: 5 },
122+
} as UIMessageChunk);
123+
}
122124
controller.close();
123-
} else {
124-
controller.error(err);
125+
} catch (err: any) {
126+
if (err.message === "AbortError" || signal?.aborted) {
127+
controller.close();
128+
} else {
129+
controller.error(err);
130+
}
125131
}
126-
}
127-
},
128-
});
129-
return stream;
130-
},
131-
} as any;
132+
},
133+
});
134+
return stream;
135+
},
136+
} as any,
137+
};
132138
}
133139

134140
// Helper to create a stored message

packages/blink/src/local/chat-manager.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import {
1717
import type { ID } from "../agent/types";
1818
import { stripVTControlCharacters } from "node:util";
1919
import { RWLock } from "./rw-lock";
20+
import type { Agent } from "../react/use-agent";
2021

2122
export type ChatStatus = "idle" | "streaming" | "error";
2223

@@ -60,7 +61,7 @@ type StateListener = (state: ChatState) => void;
6061
*/
6162
export class ChatManager {
6263
private chatId: ID;
63-
private agent: Client | undefined;
64+
private agent: Agent | undefined;
6465
private chatStore: Store<StoredChat>;
6566
private serializeMessage?: (message: UIMessage) => StoredMessage | undefined;
6667
private filterMessages?: (message: StoredMessage) => boolean;
@@ -171,7 +172,7 @@ export class ChatManager {
171172
/**
172173
* Update the agent instance to be used for chats
173174
*/
174-
setAgent(agent: Client | undefined): void {
175+
setAgent(agent: Agent | undefined): void {
175176
this.agent = agent;
176177
}
177178

@@ -428,11 +429,11 @@ export class ChatManager {
428429
});
429430

430431
// Acquire read lock on agent to prevent it from being disposed while streaming.
431-
using _agentLock = await this.agent.agentLock.read();
432+
using _agentLock = await this.agent.lock.read();
432433
// Stream agent response
433434
const streamStartTime = performance.now();
434435
const stream = await runAgent({
435-
agent: this.agent,
436+
agent: this.agent.client,
436437
id: this.chatId as ID,
437438
signal: controller.signal,
438439
messages,

packages/blink/src/local/server.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,12 @@ import { ChatManager } from "./chat-manager";
1010
import { createDiskStore } from "./disk-store";
1111
import { convertMessage, type StoredChat } from "./types";
1212
import { v5 as uuidv5 } from "uuid";
13+
import type { Agent } from "../react/use-agent";
1314

1415
export interface CreateLocalServerOptions {
1516
readonly dataDirectory: string;
1617
readonly port?: number;
17-
readonly getAgent: () => Client | undefined;
18+
readonly getAgent: () => Agent | undefined;
1819
}
1920

2021
export type LocalServer = ReturnType<typeof createLocalServer>;

packages/blink/src/react/use-agent.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,14 @@ export interface UseAgentOptions {
2020
readonly apiServerUrl?: string;
2121
}
2222

23+
export interface Agent {
24+
readonly client: Client;
25+
readonly lock: RWLock;
26+
}
27+
2328
// useAgent is a hook that provides a client for an agent at the given entrypoint.
2429
export default function useAgent(options: UseAgentOptions) {
25-
const [agent, setAgent] = useState<Client | undefined>(undefined);
30+
const [agent, setAgent] = useState<Agent | undefined>(undefined);
2631
const [logs, setLogs] = useState<AgentLog[]>([]);
2732
const [error, setError] = useState<Error | undefined>(undefined);
2833
const [buildResult, setBuildResult] = useState(options.buildResult);
@@ -133,7 +138,8 @@ export default function useAgent(options: UseAgentOptions) {
133138
const client = new Client({
134139
baseUrl: `http://127.0.0.1:${port}`,
135140
});
136-
lock = client.agentLock;
141+
const agentLock = new RWLock();
142+
lock = agentLock;
137143
// Wait for the health endpoint to be alive.
138144
while (!controller.signal.aborted) {
139145
try {
@@ -150,7 +156,7 @@ export default function useAgent(options: UseAgentOptions) {
150156
ready = true;
151157
const capabilities = await client.capabilities();
152158
setCapabilities(capabilities);
153-
setAgent(client);
159+
setAgent({ client, lock: agentLock });
154160
})().catch((err) => {
155161
// Don't set error if this was just a cleanup abort
156162
if (!isCleanup) {

packages/blink/src/react/use-chat.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,13 @@ import type { Client } from "../agent/client";
44
import { ChatManager, type ChatState } from "../local/chat-manager";
55
import type { StoredMessage } from "../local/types";
66
import type { ID } from "../agent/types";
7+
import type { Agent } from "./use-agent";
78

89
export type { ChatStatus } from "../local/chat-manager";
910

1011
export interface UseChatOptions {
1112
readonly chatId: ID;
12-
readonly agent: Client | undefined;
13+
readonly agent: Agent | undefined;
1314
readonly chatsDirectory: string;
1415
/**
1516
* Optional function to filter messages before persisting them.

packages/blink/src/react/use-dev-mode.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import { isLogMessage, isStoredMessageMetadata } from "../local/types";
1111
import type { BuildLog } from "../build";
1212
import type { ID, UIOptions, UIOptionsSchema } from "../agent/index.browser";
1313
import useOptions from "./use-options";
14-
import useAgent, { type AgentLog } from "./use-agent";
14+
import useAgent, { type AgentLog, type Agent } from "./use-agent";
1515
import useBundler, { type BundlerStatus } from "./use-bundler";
1616
import useChat, { type UseChat } from "./use-chat";
1717
import useDevhook from "./use-devhook";
@@ -196,7 +196,7 @@ export default function useDevMode(options: UseDevModeOptions): UseDevMode {
196196
}, [env, options.onEnvLoaded]);
197197

198198
// Server - always use run agent for webhook/API handling
199-
const runAgentRef = useRef<Client | undefined>(undefined);
199+
const runAgentRef = useRef<Agent | undefined>(undefined);
200200
const server = useMemo(() => {
201201
return createLocalServer({
202202
port: 0,
@@ -219,7 +219,7 @@ export default function useDevMode(options: UseDevModeOptions): UseDevMode {
219219

220220
// Edit agent
221221
const {
222-
client: editAgent,
222+
agent: editAgent,
223223
error: editAgentError,
224224
missingApiKey: editModeMissingApiKey,
225225
setUserAgentUrl,
@@ -247,7 +247,7 @@ export default function useDevMode(options: UseDevModeOptions): UseDevMode {
247247
// Update edit agent with user agent URL and handle cleanup
248248
useEffect(() => {
249249
if (agent) {
250-
setUserAgentUrl(agent.baseUrl);
250+
setUserAgentUrl(agent.client.baseUrl);
251251
}
252252

253253
// Stop streaming when agents become unavailable
@@ -382,7 +382,7 @@ export default function useDevMode(options: UseDevModeOptions): UseDevMode {
382382

383383
// Always send the request to the user's agent (not the edit agent)
384384
const requestURL = new URL(request.url);
385-
const agentURL = new URL(agent.baseUrl);
385+
const agentURL = new URL(agent.client.baseUrl);
386386
agentURL.pathname = requestURL.pathname;
387387
agentURL.search = requestURL.search;
388388

@@ -431,7 +431,7 @@ export default function useDevMode(options: UseDevModeOptions): UseDevMode {
431431
error: optionsError,
432432
setOption,
433433
} = useOptions({
434-
agent: mode === "run" ? agent : editAgent,
434+
agent: mode === "run" ? agent?.client : editAgent?.client,
435435
capabilities,
436436
messages: chat.messages,
437437
});

0 commit comments

Comments
 (0)