Skip to content

Commit 53de00a

Browse files
committed
πŸ€– feat: SSH connection pool with backoff and singleflighting
Prevents thundering herd issues with SSH connections by: - Adding SSHConnectionPool class with health tracking - Implementing exponential backoff (1s β†’ 5s β†’ 10s β†’ 20s β†’ 40s β†’ 60s cap) - Singleflighting concurrent connection attempts to same host - Probing unknown connections before first use - Skipping probes for known-healthy connections Integration points: - SSHRuntime.exec() and execSSHCommand() call acquireConnection() - PTYService calls acquireConnection() before spawning SSH terminals _Generated with mux_
1 parent b061c16 commit 53de00a

File tree

5 files changed

+456
-29
lines changed

5 files changed

+456
-29
lines changed

β€Žsrc/node/runtime/SSHRuntime.tsβ€Ž

Lines changed: 28 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import { expandTildeForSSH, cdCommandForSSH } from "./tildeExpansion";
2424
import { getProjectName } from "@/node/utils/runtime/helpers";
2525
import { getErrorMessage } from "@/common/utils/errors";
2626
import { execAsync, DisposableProcess } from "@/node/utils/disposableExec";
27-
import { getControlPath } from "./sshConnectionPool";
27+
import { getControlPath, sshConnectionPool, type SSHRuntimeConfig } from "./sshConnectionPool";
2828
import { getBashPath } from "@/node/utils/main/bashPath";
2929

3030
/**
@@ -40,19 +40,8 @@ const shescape = {
4040
},
4141
};
4242

43-
/**
44-
* SSH Runtime Configuration
45-
*/
46-
export interface SSHRuntimeConfig {
47-
/** SSH host (can be hostname, user@host, or SSH config alias) */
48-
host: string;
49-
/** Working directory on remote host */
50-
srcBaseDir: string;
51-
/** Optional: Path to SSH private key (if not using ~/.ssh/config or ssh-agent) */
52-
identityFile?: string;
53-
/** Optional: SSH port (default: 22) */
54-
port?: number;
55-
}
43+
// Re-export SSHRuntimeConfig from connection pool to maintain API compatibility
44+
export type { SSHRuntimeConfig } from "./sshConnectionPool";
5645

5746
/**
5847
* SSH runtime implementation that executes commands and file operations
@@ -92,7 +81,6 @@ export class SSHRuntime implements Runtime {
9281
/**
9382
* Execute command over SSH with streaming I/O
9483
*/
95-
// eslint-disable-next-line @typescript-eslint/require-await
9684
async exec(command: string, options: ExecOptions): Promise<ExecStream> {
9785
const startTime = performance.now();
9886

@@ -101,6 +89,10 @@ export class SSHRuntime implements Runtime {
10189
throw new RuntimeErrorClass("Operation aborted before execution", "exec");
10290
}
10391

92+
// Ensure connection is healthy before executing
93+
// This provides backoff protection and singleflighting for concurrent requests
94+
await sshConnectionPool.acquireConnection(this.config);
95+
10496
// Build command parts
10597
const parts: string[] = [];
10698

@@ -216,11 +208,22 @@ export class SSHRuntime implements Runtime {
216208
resolve(EXIT_CODE_TIMEOUT);
217209
return;
218210
}
219-
resolve(code ?? (signal ? -1 : 0));
211+
212+
const exitCode = code ?? (signal ? -1 : 0);
213+
214+
// SSH exit code 255 indicates connection failure - report to pool for backoff
215+
// This prevents thundering herd when a previously healthy host goes down
216+
if (exitCode === 255) {
217+
sshConnectionPool.reportFailure(this.config, "SSH connection failed (exit code 255)");
218+
}
219+
220+
resolve(exitCode);
220221
// Cleanup runs automatically via DisposableProcess
221222
});
222223

223224
sshProcess.on("error", (err) => {
225+
// Spawn errors are connection-level failures
226+
sshConnectionPool.reportFailure(this.config, `SSH spawn error: ${err.message}`);
224227
reject(new RuntimeErrorClass(`Failed to execute SSH command: ${err.message}`, "exec", err));
225228
});
226229
});
@@ -404,6 +407,9 @@ export class SSHRuntime implements Runtime {
404407
* @private
405408
*/
406409
private async execSSHCommand(command: string, timeoutMs: number): Promise<string> {
410+
// Ensure connection is healthy before executing
411+
await sshConnectionPool.acquireConnection(this.config, timeoutMs);
412+
407413
const sshArgs = this.buildSSHArgs();
408414
sshArgs.push(this.config.host, command);
409415

@@ -435,6 +441,10 @@ export class SSHRuntime implements Runtime {
435441
if (timedOut) return; // Already rejected
436442

437443
if (code !== 0) {
444+
// SSH exit code 255 indicates connection failure - report to pool for backoff
445+
if (code === 255) {
446+
sshConnectionPool.reportFailure(this.config, "SSH connection failed (exit code 255)");
447+
}
438448
reject(new RuntimeErrorClass(`SSH command failed: ${stderr.trim()}`, "network"));
439449
return;
440450
}
@@ -447,6 +457,8 @@ export class SSHRuntime implements Runtime {
447457
clearTimeout(timer);
448458
if (timedOut) return; // Already rejected
449459

460+
// Spawn errors are connection-level failures
461+
sshConnectionPool.reportFailure(this.config, `SSH spawn error: ${getErrorMessage(err)}`);
450462
reject(
451463
new RuntimeErrorClass(
452464
`Cannot execute SSH command: ${getErrorMessage(err)}`,

β€Žsrc/node/runtime/sshConnectionPool.test.tsβ€Ž

Lines changed: 149 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import * as os from "os";
22
import * as path from "path";
3-
import { getControlPath } from "./sshConnectionPool";
4-
import type { SSHRuntimeConfig } from "./SSHRuntime";
3+
import { getControlPath, SSHConnectionPool, type SSHRuntimeConfig } from "./sshConnectionPool";
54

65
describe("sshConnectionPool", () => {
76
describe("getControlPath", () => {
@@ -134,3 +133,151 @@ describe("username isolation", () => {
134133
expect(controlPath).toMatch(/mux-ssh-[a-f0-9]{12}$/);
135134
});
136135
});
136+
137+
describe("SSHConnectionPool", () => {
138+
describe("health tracking", () => {
139+
test("getConnectionHealth returns undefined for unknown connection", () => {
140+
const pool = new SSHConnectionPool();
141+
const config: SSHRuntimeConfig = {
142+
host: "unknown.example.com",
143+
srcBaseDir: "/work",
144+
};
145+
146+
expect(pool.getConnectionHealth(config)).toBeUndefined();
147+
});
148+
149+
test("markHealthy sets connection to healthy state", () => {
150+
const pool = new SSHConnectionPool();
151+
const config: SSHRuntimeConfig = {
152+
host: "test.example.com",
153+
srcBaseDir: "/work",
154+
};
155+
156+
pool.markHealthy(config);
157+
const health = pool.getConnectionHealth(config);
158+
159+
expect(health).toBeDefined();
160+
expect(health!.status).toBe("healthy");
161+
expect(health!.consecutiveFailures).toBe(0);
162+
expect(health!.lastSuccess).toBeInstanceOf(Date);
163+
});
164+
165+
test("reportFailure puts connection into backoff", () => {
166+
const pool = new SSHConnectionPool();
167+
const config: SSHRuntimeConfig = {
168+
host: "test.example.com",
169+
srcBaseDir: "/work",
170+
};
171+
172+
// Mark healthy first
173+
pool.markHealthy(config);
174+
expect(pool.getConnectionHealth(config)?.status).toBe("healthy");
175+
176+
// Report a failure
177+
pool.reportFailure(config, "Connection refused");
178+
const health = pool.getConnectionHealth(config);
179+
180+
expect(health?.status).toBe("unhealthy");
181+
expect(health?.consecutiveFailures).toBe(1);
182+
expect(health?.lastError).toBe("Connection refused");
183+
expect(health?.backoffUntil).toBeDefined();
184+
});
185+
186+
test("resetBackoff clears backoff state after failed probe", async () => {
187+
const pool = new SSHConnectionPool();
188+
const config: SSHRuntimeConfig = {
189+
host: "nonexistent.invalid.host.test",
190+
srcBaseDir: "/work",
191+
};
192+
193+
// Trigger a failure via acquireConnection (will fail to connect)
194+
await expect(pool.acquireConnection(config, 1000)).rejects.toThrow();
195+
196+
// Verify we're now in backoff
197+
const healthBefore = pool.getConnectionHealth(config);
198+
expect(healthBefore?.status).toBe("unhealthy");
199+
expect(healthBefore?.backoffUntil).toBeDefined();
200+
201+
// Reset backoff
202+
pool.resetBackoff(config);
203+
const healthAfter = pool.getConnectionHealth(config);
204+
205+
expect(healthAfter).toBeDefined();
206+
expect(healthAfter!.status).toBe("unknown");
207+
expect(healthAfter!.consecutiveFailures).toBe(0);
208+
expect(healthAfter!.backoffUntil).toBeUndefined();
209+
});
210+
});
211+
212+
describe("acquireConnection", () => {
213+
test("returns immediately for known healthy connection", async () => {
214+
const pool = new SSHConnectionPool();
215+
const config: SSHRuntimeConfig = {
216+
host: "test.example.com",
217+
srcBaseDir: "/work",
218+
};
219+
220+
// Mark as healthy first
221+
pool.markHealthy(config);
222+
223+
// Should return immediately without probing
224+
const start = Date.now();
225+
await pool.acquireConnection(config);
226+
const elapsed = Date.now() - start;
227+
228+
// Should be nearly instant (< 50ms)
229+
expect(elapsed).toBeLessThan(50);
230+
});
231+
232+
test("throws immediately when in backoff", async () => {
233+
const pool = new SSHConnectionPool();
234+
const config: SSHRuntimeConfig = {
235+
host: "nonexistent.invalid.host.test",
236+
srcBaseDir: "/work",
237+
};
238+
239+
// Trigger a failure to put connection in backoff
240+
await expect(pool.acquireConnection(config, 1000)).rejects.toThrow();
241+
242+
// Second call should throw immediately with backoff message
243+
await expect(pool.acquireConnection(config)).rejects.toThrow(/in backoff/);
244+
});
245+
246+
test("getControlPath returns deterministic path", () => {
247+
const pool = new SSHConnectionPool();
248+
const config: SSHRuntimeConfig = {
249+
host: "test.example.com",
250+
srcBaseDir: "/work",
251+
};
252+
253+
const path1 = pool.getControlPath(config);
254+
const path2 = pool.getControlPath(config);
255+
256+
expect(path1).toBe(path2);
257+
expect(path1).toBe(getControlPath(config));
258+
});
259+
});
260+
261+
describe("singleflighting", () => {
262+
test("concurrent acquireConnection calls share same probe", async () => {
263+
const pool = new SSHConnectionPool();
264+
const config: SSHRuntimeConfig = {
265+
host: "test.example.com",
266+
srcBaseDir: "/work",
267+
};
268+
269+
// Mark healthy to avoid actual probe
270+
pool.markHealthy(config);
271+
272+
// Multiple concurrent calls should all succeed
273+
const results = await Promise.all([
274+
pool.acquireConnection(config),
275+
pool.acquireConnection(config),
276+
pool.acquireConnection(config),
277+
]);
278+
279+
// All should resolve (no errors)
280+
expect(results).toHaveLength(3);
281+
});
282+
});
283+
});

0 commit comments

Comments
Β (0)