Skip to content

Commit 7ff598c

Browse files
authored
🤖 perf: reduce memory pressure from streaming message aggregation (#1151)
## Summary Performance optimizations to reduce memory pressure and GC pauses caused by inefficient string handling during streaming. ## Problem Heap analysis revealed: - **582 MB in 30.5M concatenated strings** - each streaming delta was stored as a separate part, and merging used repeated `+` concatenation creating O(n²) intermediate strings - **1.39s Major GC pauses** during animation events due to memory pressure - **35M total heap objects** accumulated during typical usage ## Changes ### 1. Stabilize `foregroundToolCallIds` Set reference Avoid invalidating React Compiler memoization by checking content equality before creating new Set instances. ### 2. Use `array.join()` for message part merging Replace O(n²) string concatenation with array accumulation. V8 optimizes `join()` much better than repeated `+`. ### 3. Compact message parts on stream end When streaming completes, merge thousands of delta parts into single strings immediately. This converts memory from O(deltas) small objects to O(content_types) merged objects, preventing accumulation. ### 4. Extract `mergeAdjacentParts()` helper Deduplicate the merge logic between `compactMessageParts()` and `getDisplayedMessages()`. ## Validation - `make static-check` passes - `make typecheck` passes - Heap timeline analysis confirms these patterns as the primary memory sources --- _Generated with `mux` • Model: `claude-sonnet-4-20250514` • Thinking: `low`_
1 parent 10e73fe commit 7ff598c

File tree

3 files changed

+90
-27
lines changed

3 files changed

+90
-27
lines changed

‎docs/AGENTS.md‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ gh pr view <number> --json mergeable,mergeStateStatus | jq '.'
5555

5656
- Core UX: projects sidebar (left panel), workspace management (local git worktrees or SSH clones), config stored in `~/.mux/config.json`.
5757
- Fetch bulk data in one IPC call—no O(n) frontend→backend loops.
58+
- **React Compiler enabled** — auto-memoization handles components/hooks; do not add manual `React.memo()`, `useMemo`, or `useCallback` for memoization purposes. Focus instead on fixing unstable object references that the compiler cannot optimize (e.g., `new Set()` in state setters, inline object literals as props).
5859

5960
## Tooling & Commands
6061

‎src/browser/hooks/useBackgroundBashHandlers.ts‎

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,14 @@ export function useBackgroundBashHandlers(
108108
if (signal.aborted) break;
109109

110110
setProcesses(state.processes);
111-
setForegroundToolCallIds(new Set(state.foregroundToolCallIds));
111+
// Only update if contents changed to avoid invalidating React Compiler memoization
112+
setForegroundToolCallIds((prev) => {
113+
const arr = state.foregroundToolCallIds;
114+
if (prev.size === arr.length && arr.every((id) => prev.has(id))) {
115+
return prev;
116+
}
117+
return new Set(arr);
118+
});
112119

113120
// Clear terminating IDs for processes that are no longer running
114121
// (killed/exited/failed should clear so new processes with same name aren't affected)

‎src/browser/utils/messages/StreamingMessageAggregator.ts‎

Lines changed: 81 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,66 @@ function hasFailureResult(result: unknown): boolean {
7474
return false;
7575
}
7676

77+
/**
78+
* Merge adjacent text/reasoning parts using array accumulation + join().
79+
* Avoids O(n²) string allocations from repeated concatenation.
80+
* Tool parts are preserved as-is between merged text/reasoning runs.
81+
*/
82+
function mergeAdjacentParts(parts: MuxMessage["parts"]): MuxMessage["parts"] {
83+
if (parts.length <= 1) return parts;
84+
85+
const merged: MuxMessage["parts"] = [];
86+
let pendingTexts: string[] = [];
87+
let pendingTextTimestamp: number | undefined;
88+
let pendingReasonings: string[] = [];
89+
let pendingReasoningTimestamp: number | undefined;
90+
91+
const flushText = () => {
92+
if (pendingTexts.length > 0) {
93+
merged.push({
94+
type: "text",
95+
text: pendingTexts.join(""),
96+
timestamp: pendingTextTimestamp,
97+
});
98+
pendingTexts = [];
99+
pendingTextTimestamp = undefined;
100+
}
101+
};
102+
103+
const flushReasoning = () => {
104+
if (pendingReasonings.length > 0) {
105+
merged.push({
106+
type: "reasoning",
107+
text: pendingReasonings.join(""),
108+
timestamp: pendingReasoningTimestamp,
109+
});
110+
pendingReasonings = [];
111+
pendingReasoningTimestamp = undefined;
112+
}
113+
};
114+
115+
for (const part of parts) {
116+
if (part.type === "text") {
117+
flushReasoning();
118+
pendingTexts.push(part.text);
119+
pendingTextTimestamp ??= part.timestamp;
120+
} else if (part.type === "reasoning") {
121+
flushText();
122+
pendingReasonings.push(part.text);
123+
pendingReasoningTimestamp ??= part.timestamp;
124+
} else {
125+
// Tool part - flush and keep as-is
126+
flushText();
127+
flushReasoning();
128+
merged.push(part);
129+
}
130+
}
131+
flushText();
132+
flushReasoning();
133+
134+
return merged;
135+
}
136+
77137
export class StreamingMessageAggregator {
78138
private messages = new Map<string, MuxMessage>();
79139
private activeStreams = new Map<string, StreamingContext>();
@@ -288,6 +348,15 @@ export class StreamingMessageAggregator {
288348
this.currentTodos = [];
289349
}
290350

351+
/**
352+
* Compact a message's parts array by merging adjacent text/reasoning parts.
353+
* Called when streaming ends to convert thousands of delta parts into single strings.
354+
* This reduces memory from O(deltas) small objects to O(content_types) merged objects.
355+
*/
356+
private compactMessageParts(message: MuxMessage): void {
357+
message.parts = mergeAdjacentParts(message.parts);
358+
}
359+
291360
addMessage(message: MuxMessage): void {
292361
const existing = this.messages.get(message.id);
293362
if (existing) {
@@ -545,6 +614,10 @@ export class StreamingMessageAggregator {
545614
}
546615
}
547616
}
617+
618+
// Compact parts to merge adjacent text/reasoning deltas into single strings
619+
// This reduces memory from thousands of small delta objects to a few merged objects
620+
this.compactMessageParts(message);
548621
}
549622

550623
// Clean up stream-scoped state (active stream tracking, TODOs)
@@ -587,6 +660,9 @@ export class StreamingMessageAggregator {
587660
partial: true,
588661
...data.metadata, // Spread abort metadata (usage, duration)
589662
};
663+
664+
// Compact parts even on abort - still reduces memory for partial messages
665+
this.compactMessageParts(message);
590666
}
591667

592668
// Clean up stream-scoped state (active stream tracking, TODOs)
@@ -606,6 +682,9 @@ export class StreamingMessageAggregator {
606682
message.metadata.partial = true;
607683
message.metadata.error = data.error;
608684
message.metadata.errorType = data.errorType;
685+
686+
// Compact parts even on error - still reduces memory for partial messages
687+
this.compactMessageParts(message);
609688
}
610689

611690
// Clean up stream-scoped state (active stream tracking, TODOs)
@@ -957,32 +1036,8 @@ export class StreamingMessageAggregator {
9571036
// Direct Map.has() check - O(1) instead of O(n) iteration
9581037
const hasActiveStream = this.activeStreams.has(message.id);
9591038

960-
// Merge adjacent parts of same type (text with text, reasoning with reasoning)
961-
// This is where all merging happens - streaming just appends raw deltas
962-
const mergedParts: typeof message.parts = [];
963-
for (const part of message.parts) {
964-
const lastMerged = mergedParts[mergedParts.length - 1];
965-
966-
// Try to merge with last part if same type
967-
if (lastMerged?.type === "text" && part.type === "text") {
968-
// Merge text parts, preserving the first timestamp
969-
mergedParts[mergedParts.length - 1] = {
970-
type: "text",
971-
text: lastMerged.text + part.text,
972-
timestamp: lastMerged.timestamp ?? part.timestamp,
973-
};
974-
} else if (lastMerged?.type === "reasoning" && part.type === "reasoning") {
975-
// Merge reasoning parts, preserving the first timestamp
976-
mergedParts[mergedParts.length - 1] = {
977-
type: "reasoning",
978-
text: lastMerged.text + part.text,
979-
timestamp: lastMerged.timestamp ?? part.timestamp,
980-
};
981-
} else {
982-
// Different type or tool part - add new part
983-
mergedParts.push(part);
984-
}
985-
}
1039+
// Merge adjacent text/reasoning parts for display
1040+
const mergedParts = mergeAdjacentParts(message.parts);
9861041

9871042
// Find the last part that will produce a DisplayedMessage
9881043
// (reasoning, text parts with content, OR tool parts)

0 commit comments

Comments
 (0)