Skip to content

Commit 24d3a68

Browse files
authored
refactor: remove provider-level lock from stream manager (#158)
Removed the provider mutex and release callbacks from stream state. Workspace mutex already serializes startStream calls, so the extra provider lock only added bookkeeping without guarding new behavior. Also removed cleanup paths that were exclusively handling the provider.
1 parent 8f427c3 commit 24d3a68

File tree

1 file changed

+2
-58
lines changed

1 file changed

+2
-58
lines changed

src/services/streamManager.ts

Lines changed: 2 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,6 @@ interface WorkspaceStreamInfo {
9393
model: string;
9494
initialMetadata?: Partial<CmuxMetadata>;
9595
historySequence: number;
96-
providerName?: string;
97-
releaseProviderLock?: () => void;
9896
// Track accumulated parts for partial message (includes reasoning, text, and tools)
9997
parts: CompletedMessagePart[];
10098
// Track last partial write time for throttling
@@ -118,7 +116,6 @@ interface WorkspaceStreamInfo {
118116
export class StreamManager extends EventEmitter {
119117
private workspaceStreams = new Map<WorkspaceId, WorkspaceStreamInfo>();
120118
private streamLocks = new Map<WorkspaceId, AsyncMutex>();
121-
private providerLocks = new Map<string, Promise<void>>();
122119
private readonly PARTIAL_WRITE_THROTTLE_MS = 500;
123120
private readonly historyService: HistoryService;
124121
private readonly partialService: PartialService;
@@ -129,37 +126,6 @@ export class StreamManager extends EventEmitter {
129126
this.partialService = partialService;
130127
}
131128

132-
/**
133-
* Acquire a provider-level lock to throttle concurrent provider streams.
134-
* Currently only enforced for OpenAI (provider = "openai").
135-
*/
136-
private async acquireProviderLock(providerName: string): Promise<() => void> {
137-
if (providerName !== "openai") {
138-
return () => undefined;
139-
}
140-
141-
const previous = this.providerLocks.get(providerName) ?? Promise.resolve();
142-
143-
let release!: () => void;
144-
const current = new Promise<void>((resolve) => {
145-
release = resolve;
146-
});
147-
const chain = previous.then(() => current);
148-
this.providerLocks.set(providerName, chain);
149-
150-
await previous;
151-
152-
let released = false;
153-
return () => {
154-
if (released) return;
155-
released = true;
156-
release();
157-
if (this.providerLocks.get(providerName) === chain) {
158-
this.providerLocks.delete(providerName);
159-
}
160-
};
161-
}
162-
163129
/**
164130
* Write the current partial message to disk (throttled by mtime)
165131
* Ensures writes happen during rapid streaming (crash-resilient)
@@ -751,10 +717,6 @@ export class StreamManager extends EventEmitter {
751717
clearTimeout(streamInfo.partialWriteTimer);
752718
streamInfo.partialWriteTimer = undefined;
753719
}
754-
if (streamInfo.releaseProviderLock) {
755-
streamInfo.releaseProviderLock();
756-
streamInfo.releaseProviderLock = undefined;
757-
}
758720
this.workspaceStreams.delete(workspaceId);
759721
}
760722
}
@@ -888,7 +850,6 @@ export class StreamManager extends EventEmitter {
888850
}
889851
const mutex = this.streamLocks.get(typedWorkspaceId)!;
890852

891-
let releaseProviderLock: (() => void) | undefined;
892853
try {
893854
// Acquire lock - guarantees only one startStream per workspace
894855
// Lock is automatically released when scope exits via Symbol.asyncDispose
@@ -901,12 +862,7 @@ export class StreamManager extends EventEmitter {
901862

902863
// Step 1: Atomic safety check (cancels any existing stream and waits for full exit)
903864
const streamToken = await this.ensureStreamSafety(typedWorkspaceId);
904-
905-
// Step 2: Acquire provider-level lock to prevent overlapping OpenAI streams
906-
const providerName = modelString.split(":")[0] ?? "";
907-
releaseProviderLock = await this.acquireProviderLock(providerName);
908-
909-
// Step 3: Atomic stream creation and registration
865+
// Step 2: Atomic stream creation and registration
910866
const streamInfo = this.createStreamAtomically(
911867
typedWorkspaceId,
912868
streamToken,
@@ -922,10 +878,8 @@ export class StreamManager extends EventEmitter {
922878
maxOutputTokens,
923879
toolPolicy
924880
);
925-
streamInfo.providerName = providerName;
926-
streamInfo.releaseProviderLock = releaseProviderLock;
927881

928-
// Step 4: Track the processing promise for guaranteed cleanup
882+
// Step 3: Track the processing promise for guaranteed cleanup
929883
// This allows cancelStreamSafely to wait for full exit
930884
streamInfo.processingPromise = this.processStreamWithCleanup(
931885
typedWorkspaceId,
@@ -937,16 +891,6 @@ export class StreamManager extends EventEmitter {
937891

938892
return Ok(streamToken);
939893
} catch (error) {
940-
// Release provider lock if acquired before failure
941-
if (releaseProviderLock) {
942-
releaseProviderLock();
943-
releaseProviderLock = undefined;
944-
}
945-
const existing = this.workspaceStreams.get(typedWorkspaceId);
946-
if (existing?.releaseProviderLock) {
947-
existing.releaseProviderLock();
948-
existing.releaseProviderLock = undefined;
949-
}
950894
// Guaranteed cleanup on any failure
951895
this.workspaceStreams.delete(typedWorkspaceId);
952896
// Convert to strongly-typed error

0 commit comments

Comments
 (0)