Skip to content

Commit 7e7d883

Browse files
authored
🤖 feat: SSH connection pool with backoff and singleflighting (#922)
Prevents thundering herd issues with SSH connections by adding health tracking, exponential backoff, and singleflighting to the connection pool. ## Changes - **SSHConnectionPool class** with: - Health tracking (healthy/unhealthy/unknown states) - Exponential backoff: 1s → 5s → 10s → 20s → 40s → 60s (cap) - Singleflighting: concurrent probes to same host share one attempt - Fast-path for known-healthy connections (no re-probe) - **Integration points**: - `SSHRuntime.exec()` and `execSSHCommand()` call `acquireConnection()` - `PTYService` calls `acquireConnection()` before spawning SSH terminals ## Flow ``` acquireConnection() → in backoff? → throw immediately → known healthy? → return immediately → inflight probe? → wait on existing promise → start probe → success? → mark healthy, return → failure? → mark failed + backoff, throw ``` _Generated with `mux`_
1 parent 620aced commit 7e7d883

File tree

5 files changed

+484
-34
lines changed

5 files changed

+484
-34
lines changed

src/node/runtime/SSHRuntime.ts

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import { expandTildeForSSH, cdCommandForSSH } from "./tildeExpansion";
2424
import { getProjectName, execBuffered } from "@/node/utils/runtime/helpers";
2525
import { getErrorMessage } from "@/common/utils/errors";
2626
import { execAsync, DisposableProcess } from "@/node/utils/disposableExec";
27-
import { getControlPath } from "./sshConnectionPool";
27+
import { getControlPath, sshConnectionPool, type SSHRuntimeConfig } from "./sshConnectionPool";
2828
import { getBashPath } from "@/node/utils/main/bashPath";
2929

3030
/**
@@ -41,21 +41,8 @@ const shescape = {
4141
},
4242
};
4343

44-
/**
45-
* SSH Runtime Configuration
46-
*/
47-
export interface SSHRuntimeConfig {
48-
/** SSH host (can be hostname, user@host, or SSH config alias) */
49-
host: string;
50-
/** Working directory on remote host */
51-
srcBaseDir: string;
52-
/** Directory on remote for background process output (default: /tmp/mux-bashes) */
53-
bgOutputDir?: string;
54-
/** Optional: Path to SSH private key (if not using ~/.ssh/config or ssh-agent) */
55-
identityFile?: string;
56-
/** Optional: SSH port (default: 22) */
57-
port?: number;
58-
}
44+
// Re-export SSHRuntimeConfig from connection pool (defined there to avoid circular deps)
45+
export type { SSHRuntimeConfig } from "./sshConnectionPool";
5946

6047
/**
6148
* SSH runtime implementation that executes commands and file operations
@@ -127,7 +114,6 @@ export class SSHRuntime implements Runtime {
127114
/**
128115
* Execute command over SSH with streaming I/O
129116
*/
130-
// eslint-disable-next-line @typescript-eslint/require-await
131117
async exec(command: string, options: ExecOptions): Promise<ExecStream> {
132118
const startTime = performance.now();
133119

@@ -136,6 +122,10 @@ export class SSHRuntime implements Runtime {
136122
throw new RuntimeErrorClass("Operation aborted before execution", "exec");
137123
}
138124

125+
// Ensure connection is healthy before executing
126+
// This provides backoff protection and singleflighting for concurrent requests
127+
await sshConnectionPool.acquireConnection(this.config);
128+
139129
// Build command parts
140130
const parts: string[] = [];
141131

@@ -233,11 +223,22 @@ export class SSHRuntime implements Runtime {
233223
resolve(EXIT_CODE_TIMEOUT);
234224
return;
235225
}
236-
resolve(code ?? (signal ? -1 : 0));
226+
227+
const exitCode = code ?? (signal ? -1 : 0);
228+
229+
// SSH exit code 255 indicates connection failure - report to pool for backoff
230+
// This prevents thundering herd when a previously healthy host goes down
231+
if (exitCode === 255) {
232+
sshConnectionPool.reportFailure(this.config, "SSH connection failed (exit code 255)");
233+
}
234+
235+
resolve(exitCode);
237236
// Cleanup runs automatically via DisposableProcess
238237
});
239238

240239
sshProcess.on("error", (err) => {
240+
// Spawn errors are connection-level failures
241+
sshConnectionPool.reportFailure(this.config, `SSH spawn error: ${err.message}`);
241242
reject(new RuntimeErrorClass(`Failed to execute SSH command: ${err.message}`, "exec", err));
242243
});
243244
});
@@ -427,6 +428,9 @@ export class SSHRuntime implements Runtime {
427428
* @private
428429
*/
429430
private async execSSHCommand(command: string, timeoutMs: number): Promise<string> {
431+
// Ensure connection is healthy before executing
432+
await sshConnectionPool.acquireConnection(this.config, timeoutMs);
433+
430434
const sshArgs = this.buildSSHArgs();
431435
sshArgs.push(this.config.host, command);
432436

@@ -461,6 +465,10 @@ export class SSHRuntime implements Runtime {
461465
if (timedOut) return; // Already rejected
462466

463467
if (code !== 0) {
468+
// SSH exit code 255 indicates connection failure - report to pool for backoff
469+
if (code === 255) {
470+
sshConnectionPool.reportFailure(this.config, "SSH connection failed (exit code 255)");
471+
}
464472
reject(new RuntimeErrorClass(`SSH command failed: ${stderr.trim()}`, "network"));
465473
return;
466474
}
@@ -473,6 +481,8 @@ export class SSHRuntime implements Runtime {
473481
clearTimeout(timer);
474482
if (timedOut) return; // Already rejected
475483

484+
// Spawn errors are connection-level failures
485+
sshConnectionPool.reportFailure(this.config, `SSH spawn error: ${getErrorMessage(err)}`);
476486
reject(
477487
new RuntimeErrorClass(
478488
`Cannot execute SSH command: ${getErrorMessage(err)}`,

src/node/runtime/sshConnectionPool.test.ts

Lines changed: 153 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import * as os from "os";
22
import * as path from "path";
3-
import { getControlPath } from "./sshConnectionPool";
4-
import type { SSHRuntimeConfig } from "./SSHRuntime";
3+
import { getControlPath, SSHConnectionPool, type SSHRuntimeConfig } from "./sshConnectionPool";
54

65
describe("sshConnectionPool", () => {
76
describe("getControlPath", () => {
@@ -59,7 +58,9 @@ describe("sshConnectionPool", () => {
5958
expect(getControlPath(config1)).not.toBe(getControlPath(config2));
6059
});
6160

62-
test("different srcBaseDirs produce different controlPaths", () => {
61+
test("different srcBaseDirs produce same controlPaths (connection shared)", () => {
62+
// srcBaseDir is intentionally excluded from connection key -
63+
// workspaces on the same host share health tracking and multiplexing
6364
const config1: SSHRuntimeConfig = {
6465
host: "test.com",
6566
srcBaseDir: "/work1",
@@ -69,7 +70,7 @@ describe("sshConnectionPool", () => {
6970
srcBaseDir: "/work2",
7071
};
7172

72-
expect(getControlPath(config1)).not.toBe(getControlPath(config2));
73+
expect(getControlPath(config1)).toBe(getControlPath(config2));
7374
});
7475

7576
test("controlPath is in tmpdir with expected format", () => {
@@ -134,3 +135,151 @@ describe("username isolation", () => {
134135
expect(controlPath).toMatch(/mux-ssh-[a-f0-9]{12}$/);
135136
});
136137
});
138+
139+
describe("SSHConnectionPool", () => {
140+
describe("health tracking", () => {
141+
test("getConnectionHealth returns undefined for unknown connection", () => {
142+
const pool = new SSHConnectionPool();
143+
const config: SSHRuntimeConfig = {
144+
host: "unknown.example.com",
145+
srcBaseDir: "/work",
146+
};
147+
148+
expect(pool.getConnectionHealth(config)).toBeUndefined();
149+
});
150+
151+
test("markHealthy sets connection to healthy state", () => {
152+
const pool = new SSHConnectionPool();
153+
const config: SSHRuntimeConfig = {
154+
host: "test.example.com",
155+
srcBaseDir: "/work",
156+
};
157+
158+
pool.markHealthy(config);
159+
const health = pool.getConnectionHealth(config);
160+
161+
expect(health).toBeDefined();
162+
expect(health!.status).toBe("healthy");
163+
expect(health!.consecutiveFailures).toBe(0);
164+
expect(health!.lastSuccess).toBeInstanceOf(Date);
165+
});
166+
167+
test("reportFailure puts connection into backoff", () => {
168+
const pool = new SSHConnectionPool();
169+
const config: SSHRuntimeConfig = {
170+
host: "test.example.com",
171+
srcBaseDir: "/work",
172+
};
173+
174+
// Mark healthy first
175+
pool.markHealthy(config);
176+
expect(pool.getConnectionHealth(config)?.status).toBe("healthy");
177+
178+
// Report a failure
179+
pool.reportFailure(config, "Connection refused");
180+
const health = pool.getConnectionHealth(config);
181+
182+
expect(health?.status).toBe("unhealthy");
183+
expect(health?.consecutiveFailures).toBe(1);
184+
expect(health?.lastError).toBe("Connection refused");
185+
expect(health?.backoffUntil).toBeDefined();
186+
});
187+
188+
test("resetBackoff clears backoff state after failed probe", async () => {
189+
const pool = new SSHConnectionPool();
190+
const config: SSHRuntimeConfig = {
191+
host: "nonexistent.invalid.host.test",
192+
srcBaseDir: "/work",
193+
};
194+
195+
// Trigger a failure via acquireConnection (will fail to connect)
196+
await expect(pool.acquireConnection(config, 1000)).rejects.toThrow();
197+
198+
// Verify we're now in backoff
199+
const healthBefore = pool.getConnectionHealth(config);
200+
expect(healthBefore?.status).toBe("unhealthy");
201+
expect(healthBefore?.backoffUntil).toBeDefined();
202+
203+
// Reset backoff
204+
pool.resetBackoff(config);
205+
const healthAfter = pool.getConnectionHealth(config);
206+
207+
expect(healthAfter).toBeDefined();
208+
expect(healthAfter!.status).toBe("unknown");
209+
expect(healthAfter!.consecutiveFailures).toBe(0);
210+
expect(healthAfter!.backoffUntil).toBeUndefined();
211+
});
212+
});
213+
214+
describe("acquireConnection", () => {
215+
test("returns immediately for known healthy connection", async () => {
216+
const pool = new SSHConnectionPool();
217+
const config: SSHRuntimeConfig = {
218+
host: "test.example.com",
219+
srcBaseDir: "/work",
220+
};
221+
222+
// Mark as healthy first
223+
pool.markHealthy(config);
224+
225+
// Should return immediately without probing
226+
const start = Date.now();
227+
await pool.acquireConnection(config);
228+
const elapsed = Date.now() - start;
229+
230+
// Should be nearly instant (< 50ms)
231+
expect(elapsed).toBeLessThan(50);
232+
});
233+
234+
test("throws immediately when in backoff", async () => {
235+
const pool = new SSHConnectionPool();
236+
const config: SSHRuntimeConfig = {
237+
host: "nonexistent.invalid.host.test",
238+
srcBaseDir: "/work",
239+
};
240+
241+
// Trigger a failure to put connection in backoff
242+
await expect(pool.acquireConnection(config, 1000)).rejects.toThrow();
243+
244+
// Second call should throw immediately with backoff message
245+
await expect(pool.acquireConnection(config)).rejects.toThrow(/in backoff/);
246+
});
247+
248+
test("getControlPath returns deterministic path", () => {
249+
const pool = new SSHConnectionPool();
250+
const config: SSHRuntimeConfig = {
251+
host: "test.example.com",
252+
srcBaseDir: "/work",
253+
};
254+
255+
const path1 = pool.getControlPath(config);
256+
const path2 = pool.getControlPath(config);
257+
258+
expect(path1).toBe(path2);
259+
expect(path1).toBe(getControlPath(config));
260+
});
261+
});
262+
263+
describe("singleflighting", () => {
264+
test("concurrent acquireConnection calls share same probe", async () => {
265+
const pool = new SSHConnectionPool();
266+
const config: SSHRuntimeConfig = {
267+
host: "nonexistent.invalid.host.test",
268+
srcBaseDir: "/work",
269+
};
270+
271+
// All concurrent calls should share the same probe and get same result
272+
const results = await Promise.allSettled([
273+
pool.acquireConnection(config, 1000),
274+
pool.acquireConnection(config, 1000),
275+
pool.acquireConnection(config, 1000),
276+
]);
277+
278+
// All should be rejected (connection fails)
279+
expect(results.every((r) => r.status === "rejected")).toBe(true);
280+
281+
// Only 1 failure should be recorded (not 3) - proves singleflighting worked
282+
expect(pool.getConnectionHealth(config)?.consecutiveFailures).toBe(1);
283+
});
284+
});
285+
});

0 commit comments

Comments
 (0)