Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 28 additions & 16 deletions src/node/runtime/SSHRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import { expandTildeForSSH, cdCommandForSSH } from "./tildeExpansion";
import { getProjectName } from "@/node/utils/runtime/helpers";
import { getErrorMessage } from "@/common/utils/errors";
import { execAsync, DisposableProcess } from "@/node/utils/disposableExec";
import { getControlPath } from "./sshConnectionPool";
import { getControlPath, sshConnectionPool, type SSHRuntimeConfig } from "./sshConnectionPool";
import { getBashPath } from "@/node/utils/main/bashPath";

/**
Expand All @@ -40,19 +40,8 @@ const shescape = {
},
};

/**
* SSH Runtime Configuration
*/
export interface SSHRuntimeConfig {
/** SSH host (can be hostname, user@host, or SSH config alias) */
host: string;
/** Working directory on remote host */
srcBaseDir: string;
/** Optional: Path to SSH private key (if not using ~/.ssh/config or ssh-agent) */
identityFile?: string;
/** Optional: SSH port (default: 22) */
port?: number;
}
// Re-export SSHRuntimeConfig from connection pool to maintain API compatibility
export type { SSHRuntimeConfig } from "./sshConnectionPool";

/**
* SSH runtime implementation that executes commands and file operations
Expand Down Expand Up @@ -92,7 +81,6 @@ export class SSHRuntime implements Runtime {
/**
* Execute command over SSH with streaming I/O
*/
// eslint-disable-next-line @typescript-eslint/require-await
async exec(command: string, options: ExecOptions): Promise<ExecStream> {
const startTime = performance.now();

Expand All @@ -101,6 +89,10 @@ export class SSHRuntime implements Runtime {
throw new RuntimeErrorClass("Operation aborted before execution", "exec");
}

// Ensure connection is healthy before executing
// This provides backoff protection and singleflighting for concurrent requests
await sshConnectionPool.acquireConnection(this.config);

// Build command parts
const parts: string[] = [];

Expand Down Expand Up @@ -216,11 +208,22 @@ export class SSHRuntime implements Runtime {
resolve(EXIT_CODE_TIMEOUT);
return;
}
resolve(code ?? (signal ? -1 : 0));

const exitCode = code ?? (signal ? -1 : 0);

// SSH exit code 255 indicates connection failure - report to pool for backoff
// This prevents thundering herd when a previously healthy host goes down
if (exitCode === 255) {
sshConnectionPool.reportFailure(this.config, "SSH connection failed (exit code 255)");
}

resolve(exitCode);
// Cleanup runs automatically via DisposableProcess
});

sshProcess.on("error", (err) => {
// Spawn errors are connection-level failures
sshConnectionPool.reportFailure(this.config, `SSH spawn error: ${err.message}`);
reject(new RuntimeErrorClass(`Failed to execute SSH command: ${err.message}`, "exec", err));
});
});
Expand Down Expand Up @@ -404,6 +407,9 @@ export class SSHRuntime implements Runtime {
* @private
*/
private async execSSHCommand(command: string, timeoutMs: number): Promise<string> {
// Ensure connection is healthy before executing
await sshConnectionPool.acquireConnection(this.config, timeoutMs);

const sshArgs = this.buildSSHArgs();
sshArgs.push(this.config.host, command);

Expand Down Expand Up @@ -435,6 +441,10 @@ export class SSHRuntime implements Runtime {
if (timedOut) return; // Already rejected

if (code !== 0) {
// SSH exit code 255 indicates connection failure - report to pool for backoff
if (code === 255) {
sshConnectionPool.reportFailure(this.config, "SSH connection failed (exit code 255)");
}
reject(new RuntimeErrorClass(`SSH command failed: ${stderr.trim()}`, "network"));
return;
}
Expand All @@ -447,6 +457,8 @@ export class SSHRuntime implements Runtime {
clearTimeout(timer);
if (timedOut) return; // Already rejected

// Spawn errors are connection-level failures
sshConnectionPool.reportFailure(this.config, `SSH spawn error: ${getErrorMessage(err)}`);
reject(
new RuntimeErrorClass(
`Cannot execute SSH command: ${getErrorMessage(err)}`,
Expand Down
151 changes: 149 additions & 2 deletions src/node/runtime/sshConnectionPool.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import * as os from "os";
import * as path from "path";
import { getControlPath } from "./sshConnectionPool";
import type { SSHRuntimeConfig } from "./SSHRuntime";
import { getControlPath, SSHConnectionPool, type SSHRuntimeConfig } from "./sshConnectionPool";

describe("sshConnectionPool", () => {
describe("getControlPath", () => {
Expand Down Expand Up @@ -134,3 +133,151 @@ describe("username isolation", () => {
expect(controlPath).toMatch(/mux-ssh-[a-f0-9]{12}$/);
});
});

describe("SSHConnectionPool", () => {
describe("health tracking", () => {
test("getConnectionHealth returns undefined for unknown connection", () => {
const pool = new SSHConnectionPool();
const config: SSHRuntimeConfig = {
host: "unknown.example.com",
srcBaseDir: "/work",
};

expect(pool.getConnectionHealth(config)).toBeUndefined();
});

test("markHealthy sets connection to healthy state", () => {
const pool = new SSHConnectionPool();
const config: SSHRuntimeConfig = {
host: "test.example.com",
srcBaseDir: "/work",
};

pool.markHealthy(config);
const health = pool.getConnectionHealth(config);

expect(health).toBeDefined();
expect(health!.status).toBe("healthy");
expect(health!.consecutiveFailures).toBe(0);
expect(health!.lastSuccess).toBeInstanceOf(Date);
});

test("reportFailure puts connection into backoff", () => {
const pool = new SSHConnectionPool();
const config: SSHRuntimeConfig = {
host: "test.example.com",
srcBaseDir: "/work",
};

// Mark healthy first
pool.markHealthy(config);
expect(pool.getConnectionHealth(config)?.status).toBe("healthy");

// Report a failure
pool.reportFailure(config, "Connection refused");
const health = pool.getConnectionHealth(config);

expect(health?.status).toBe("unhealthy");
expect(health?.consecutiveFailures).toBe(1);
expect(health?.lastError).toBe("Connection refused");
expect(health?.backoffUntil).toBeDefined();
});

test("resetBackoff clears backoff state after failed probe", async () => {
const pool = new SSHConnectionPool();
const config: SSHRuntimeConfig = {
host: "nonexistent.invalid.host.test",
srcBaseDir: "/work",
};

// Trigger a failure via acquireConnection (will fail to connect)
await expect(pool.acquireConnection(config, 1000)).rejects.toThrow();

// Verify we're now in backoff
const healthBefore = pool.getConnectionHealth(config);
expect(healthBefore?.status).toBe("unhealthy");
expect(healthBefore?.backoffUntil).toBeDefined();

// Reset backoff
pool.resetBackoff(config);
const healthAfter = pool.getConnectionHealth(config);

expect(healthAfter).toBeDefined();
expect(healthAfter!.status).toBe("unknown");
expect(healthAfter!.consecutiveFailures).toBe(0);
expect(healthAfter!.backoffUntil).toBeUndefined();
});
});

describe("acquireConnection", () => {
test("returns immediately for known healthy connection", async () => {
const pool = new SSHConnectionPool();
const config: SSHRuntimeConfig = {
host: "test.example.com",
srcBaseDir: "/work",
};

// Mark as healthy first
pool.markHealthy(config);

// Should return immediately without probing
const start = Date.now();
await pool.acquireConnection(config);
const elapsed = Date.now() - start;

// Should be nearly instant (< 50ms)
expect(elapsed).toBeLessThan(50);
});

test("throws immediately when in backoff", async () => {
const pool = new SSHConnectionPool();
const config: SSHRuntimeConfig = {
host: "nonexistent.invalid.host.test",
srcBaseDir: "/work",
};

// Trigger a failure to put connection in backoff
await expect(pool.acquireConnection(config, 1000)).rejects.toThrow();

// Second call should throw immediately with backoff message
await expect(pool.acquireConnection(config)).rejects.toThrow(/in backoff/);
});

test("getControlPath returns deterministic path", () => {
const pool = new SSHConnectionPool();
const config: SSHRuntimeConfig = {
host: "test.example.com",
srcBaseDir: "/work",
};

const path1 = pool.getControlPath(config);
const path2 = pool.getControlPath(config);

expect(path1).toBe(path2);
expect(path1).toBe(getControlPath(config));
});
});

describe("singleflighting", () => {
test("concurrent acquireConnection calls share same probe", async () => {
const pool = new SSHConnectionPool();
const config: SSHRuntimeConfig = {
host: "test.example.com",
srcBaseDir: "/work",
};

// Mark healthy to avoid actual probe
pool.markHealthy(config);

// Multiple concurrent calls should all succeed
const results = await Promise.all([
pool.acquireConnection(config),
pool.acquireConnection(config),
pool.acquireConnection(config),
]);

// All should resolve (no errors)
expect(results).toHaveLength(3);
});
});
});
Loading