Skip to content

Commit cfa5fa8

Browse files
committed
πŸ€– feat: SSH connection pool with backoff and singleflighting
Prevents thundering herd issues with SSH connections by: - Adding SSHConnectionPool class with health tracking - Implementing exponential backoff (1s β†’ 5s β†’ 10s β†’ 20s β†’ 40s β†’ 60s cap) - Singleflighting concurrent connection attempts to same host - Probing unknown connections before first use - Skipping probes for known-healthy connections Integration points: - SSHRuntime.exec() and execSSHCommand() call acquireConnection() - PTYService calls acquireConnection() before spawning SSH terminals _Generated with mux_
1 parent b061c16 commit cfa5fa8

File tree

5 files changed

+405
-28
lines changed

5 files changed

+405
-28
lines changed

β€Žsrc/node/runtime/SSHRuntime.tsβ€Ž

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import { expandTildeForSSH, cdCommandForSSH } from "./tildeExpansion";
2424
import { getProjectName } from "@/node/utils/runtime/helpers";
2525
import { getErrorMessage } from "@/common/utils/errors";
2626
import { execAsync, DisposableProcess } from "@/node/utils/disposableExec";
27-
import { getControlPath } from "./sshConnectionPool";
27+
import { getControlPath, sshConnectionPool, type SSHRuntimeConfig } from "./sshConnectionPool";
2828
import { getBashPath } from "@/node/utils/main/bashPath";
2929

3030
/**
@@ -40,19 +40,8 @@ const shescape = {
4040
},
4141
};
4242

43-
/**
44-
* SSH Runtime Configuration
45-
*/
46-
export interface SSHRuntimeConfig {
47-
/** SSH host (can be hostname, user@host, or SSH config alias) */
48-
host: string;
49-
/** Working directory on remote host */
50-
srcBaseDir: string;
51-
/** Optional: Path to SSH private key (if not using ~/.ssh/config or ssh-agent) */
52-
identityFile?: string;
53-
/** Optional: SSH port (default: 22) */
54-
port?: number;
55-
}
43+
// Re-export SSHRuntimeConfig from connection pool to maintain API compatibility
44+
export type { SSHRuntimeConfig } from "./sshConnectionPool";
5645

5746
/**
5847
* SSH runtime implementation that executes commands and file operations
@@ -92,7 +81,6 @@ export class SSHRuntime implements Runtime {
9281
/**
9382
* Execute command over SSH with streaming I/O
9483
*/
95-
// eslint-disable-next-line @typescript-eslint/require-await
9684
async exec(command: string, options: ExecOptions): Promise<ExecStream> {
9785
const startTime = performance.now();
9886

@@ -101,6 +89,10 @@ export class SSHRuntime implements Runtime {
10189
throw new RuntimeErrorClass("Operation aborted before execution", "exec");
10290
}
10391

92+
// Ensure connection is healthy before executing
93+
// This provides backoff protection and singleflighting for concurrent requests
94+
await sshConnectionPool.acquireConnection(this.config);
95+
10496
// Build command parts
10597
const parts: string[] = [];
10698

@@ -404,6 +396,9 @@ export class SSHRuntime implements Runtime {
404396
* @private
405397
*/
406398
private async execSSHCommand(command: string, timeoutMs: number): Promise<string> {
399+
// Ensure connection is healthy before executing
400+
await sshConnectionPool.acquireConnection(this.config, timeoutMs);
401+
407402
const sshArgs = this.buildSSHArgs();
408403
sshArgs.push(this.config.host, command);
409404

β€Žsrc/node/runtime/sshConnectionPool.test.tsβ€Ž

Lines changed: 128 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import * as os from "os";
22
import * as path from "path";
3-
import { getControlPath } from "./sshConnectionPool";
4-
import type { SSHRuntimeConfig } from "./SSHRuntime";
3+
import { getControlPath, SSHConnectionPool, type SSHRuntimeConfig } from "./sshConnectionPool";
54

65
describe("sshConnectionPool", () => {
76
describe("getControlPath", () => {
@@ -134,3 +133,130 @@ describe("username isolation", () => {
134133
expect(controlPath).toMatch(/mux-ssh-[a-f0-9]{12}$/);
135134
});
136135
});
136+
137+
describe("SSHConnectionPool", () => {
138+
describe("health tracking", () => {
139+
test("getConnectionHealth returns undefined for unknown connection", () => {
140+
const pool = new SSHConnectionPool();
141+
const config: SSHRuntimeConfig = {
142+
host: "unknown.example.com",
143+
srcBaseDir: "/work",
144+
};
145+
146+
expect(pool.getConnectionHealth(config)).toBeUndefined();
147+
});
148+
149+
test("markHealthy sets connection to healthy state", () => {
150+
const pool = new SSHConnectionPool();
151+
const config: SSHRuntimeConfig = {
152+
host: "test.example.com",
153+
srcBaseDir: "/work",
154+
};
155+
156+
pool.markHealthy(config);
157+
const health = pool.getConnectionHealth(config);
158+
159+
expect(health).toBeDefined();
160+
expect(health!.status).toBe("healthy");
161+
expect(health!.consecutiveFailures).toBe(0);
162+
expect(health!.lastSuccess).toBeInstanceOf(Date);
163+
});
164+
165+
test("resetBackoff clears backoff state after failed probe", async () => {
166+
const pool = new SSHConnectionPool();
167+
const config: SSHRuntimeConfig = {
168+
host: "nonexistent.invalid.host.test",
169+
srcBaseDir: "/work",
170+
};
171+
172+
// Trigger a failure via acquireConnection (will fail to connect)
173+
await expect(pool.acquireConnection(config, 1000)).rejects.toThrow();
174+
175+
// Verify we're now in backoff
176+
const healthBefore = pool.getConnectionHealth(config);
177+
expect(healthBefore?.status).toBe("unhealthy");
178+
expect(healthBefore?.backoffUntil).toBeDefined();
179+
180+
// Reset backoff
181+
pool.resetBackoff(config);
182+
const healthAfter = pool.getConnectionHealth(config);
183+
184+
expect(healthAfter).toBeDefined();
185+
expect(healthAfter!.status).toBe("unknown");
186+
expect(healthAfter!.consecutiveFailures).toBe(0);
187+
expect(healthAfter!.backoffUntil).toBeUndefined();
188+
});
189+
});
190+
191+
describe("acquireConnection", () => {
192+
test("returns immediately for known healthy connection", async () => {
193+
const pool = new SSHConnectionPool();
194+
const config: SSHRuntimeConfig = {
195+
host: "test.example.com",
196+
srcBaseDir: "/work",
197+
};
198+
199+
// Mark as healthy first
200+
pool.markHealthy(config);
201+
202+
// Should return immediately without probing
203+
const start = Date.now();
204+
await pool.acquireConnection(config);
205+
const elapsed = Date.now() - start;
206+
207+
// Should be nearly instant (< 50ms)
208+
expect(elapsed).toBeLessThan(50);
209+
});
210+
211+
test("throws immediately when in backoff", async () => {
212+
const pool = new SSHConnectionPool();
213+
const config: SSHRuntimeConfig = {
214+
host: "nonexistent.invalid.host.test",
215+
srcBaseDir: "/work",
216+
};
217+
218+
// Trigger a failure to put connection in backoff
219+
await expect(pool.acquireConnection(config, 1000)).rejects.toThrow();
220+
221+
// Second call should throw immediately with backoff message
222+
await expect(pool.acquireConnection(config)).rejects.toThrow(/in backoff/);
223+
});
224+
225+
test("getControlPath returns deterministic path", () => {
226+
const pool = new SSHConnectionPool();
227+
const config: SSHRuntimeConfig = {
228+
host: "test.example.com",
229+
srcBaseDir: "/work",
230+
};
231+
232+
const path1 = pool.getControlPath(config);
233+
const path2 = pool.getControlPath(config);
234+
235+
expect(path1).toBe(path2);
236+
expect(path1).toBe(getControlPath(config));
237+
});
238+
});
239+
240+
describe("singleflighting", () => {
241+
test("concurrent acquireConnection calls share same probe", async () => {
242+
const pool = new SSHConnectionPool();
243+
const config: SSHRuntimeConfig = {
244+
host: "test.example.com",
245+
srcBaseDir: "/work",
246+
};
247+
248+
// Mark healthy to avoid actual probe
249+
pool.markHealthy(config);
250+
251+
// Multiple concurrent calls should all succeed
252+
const results = await Promise.all([
253+
pool.acquireConnection(config),
254+
pool.acquireConnection(config),
255+
pool.acquireConnection(config),
256+
]);
257+
258+
// All should resolve (no errors)
259+
expect(results).toHaveLength(3);
260+
});
261+
});
262+
});

0 commit comments

Comments
Β (0)