Skip to content

Commit aef2b0a

Browse files
committed
Review comments: better error handling and documentation
1 parent 40b93bc commit aef2b0a

File tree

3 files changed

+66
-14
lines changed

3 files changed

+66
-14
lines changed

src/api/coderApi.ts

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,12 @@ export class CoderApi extends Api {
144144
vscode.workspace.getConfiguration(),
145145
);
146146

147+
/**
148+
* Similar to the REST client, we want to prioritize headers in this order (highest to lowest):
149+
* 1. Headers from the header command
150+
* 2. Any headers passed directly to this function
151+
* 3. Coder session token from the Api client (if set)
152+
*/
147153
const headers = {
148154
...(token ? { [coderSessionTokenHeader]: token } : {}),
149155
...configs.options?.headers,
@@ -189,7 +195,9 @@ export class CoderApi extends Api {
189195
}
190196

191197
/**
192-
* Create a WebSocket connection with SSE fallback on 404
198+
* Create a WebSocket connection with SSE fallback on 404.
199+
*
200+
* Note: The fallback on SSE ignores all passed client options except the headers.
193201
*/
194202
private async createWebSocketWithFallback<TData = unknown>(configs: {
195203
apiRoute: string;
@@ -209,13 +217,15 @@ export class CoderApi extends Api {
209217
return this.createSseFallback<TData>(
210218
configs.fallbackApiRoute,
211219
configs.searchParams,
220+
configs.options?.headers,
212221
);
213222
}
214223

215224
return this.waitForConnection(webSocket, () =>
216225
this.createSseFallback<TData>(
217226
configs.fallbackApiRoute,
218227
configs.searchParams,
228+
configs.options?.headers,
219229
),
220230
);
221231
}
@@ -260,6 +270,7 @@ export class CoderApi extends Api {
260270
private async createSseFallback<TData = unknown>(
261271
apiRoute: string,
262272
searchParams?: Record<string, string> | URLSearchParams,
273+
optionsHeaders?: Record<string, string>,
263274
): Promise<UnidirectionalStream<TData>> {
264275
this.output.warn(`WebSocket failed, using SSE fallback: ${apiRoute}`);
265276

@@ -274,6 +285,8 @@ export class CoderApi extends Api {
274285
apiRoute,
275286
searchParams,
276287
axiosInstance: this.getAxiosInstance(),
288+
optionsHeaders: optionsHeaders,
289+
logger: this.output,
277290
});
278291

279292
this.attachStreamLogger(sseConnection);

src/api/streamingFetchAdapter.ts

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import { type IncomingMessage } from "http";
88
*/
99
export function createStreamingFetchAdapter(
1010
axiosInstance: AxiosInstance,
11+
configHeaders?: Record<string, string>,
1112
): (url: string | URL, init?: FetchLikeInit) => Promise<FetchLikeResponse> {
1213
return async (
1314
url: string | URL,
@@ -18,19 +19,27 @@ export function createStreamingFetchAdapter(
1819
const response = await axiosInstance.request<IncomingMessage>({
1920
url: urlStr,
2021
signal: init?.signal,
21-
headers: init?.headers,
22+
headers: { ...init?.headers, ...configHeaders },
2223
responseType: "stream",
2324
validateStatus: () => true, // Don't throw on any status code
2425
});
2526

2627
const stream = new ReadableStream({
2728
start(controller) {
2829
response.data.on("data", (chunk: Buffer) => {
29-
controller.enqueue(chunk);
30+
try {
31+
controller.enqueue(chunk);
32+
} catch {
33+
// Stream already closed or errored, ignore
34+
}
3035
});
3136

3237
response.data.on("end", () => {
33-
controller.close();
38+
try {
39+
controller.close();
40+
} catch {
41+
// Stream already closed, ignore
42+
}
3443
});
3544

3645
response.data.on("error", (err: Error) => {

src/websocket/sseConnection.ts

Lines changed: 40 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { type WebSocketEventType } from "coder/site/src/utils/OneWayWebSocket";
44
import { EventSource } from "eventsource";
55

66
import { createStreamingFetchAdapter } from "../api/streamingFetchAdapter";
7+
import { type Logger } from "../logging/logger";
78

89
import { getQueryString } from "./utils";
910

@@ -24,11 +25,14 @@ export type SseConnectionInit = {
2425
location: { protocol: string; host: string };
2526
apiRoute: string;
2627
searchParams?: Record<string, string> | URLSearchParams;
28+
optionsHeaders?: Record<string, string>;
2729
axiosInstance: AxiosInstance;
30+
logger: Logger;
2831
};
2932

3033
export class SseConnection implements UnidirectionalStream<ServerSentEvent> {
3134
private readonly eventSource: EventSource;
35+
private readonly logger: Logger;
3236
private readonly callbacks = {
3337
open: new Set<EventHandler<ServerSentEvent, "open">>(),
3438
close: new Set<EventHandler<ServerSentEvent, "close">>(),
@@ -43,9 +47,13 @@ export class SseConnection implements UnidirectionalStream<ServerSentEvent> {
4347
public readonly url: string;
4448

4549
public constructor(init: SseConnectionInit) {
50+
this.logger = init.logger;
4651
this.url = this.buildUrl(init);
4752
this.eventSource = new EventSource(this.url, {
48-
fetch: createStreamingFetchAdapter(init.axiosInstance),
53+
fetch: createStreamingFetchAdapter(
54+
init.axiosInstance,
55+
init.optionsHeaders,
56+
),
4957
});
5058
this.setupEventHandlers();
5159
}
@@ -58,28 +66,48 @@ export class SseConnection implements UnidirectionalStream<ServerSentEvent> {
5866

5967
private setupEventHandlers(): void {
6068
this.eventSource.addEventListener("open", () =>
61-
this.callbacks.open.forEach((cb) => cb({} as WsEvent)),
69+
this.invokeCallbacks(this.callbacks.open, {} as WsEvent, "open"),
6270
);
6371

6472
this.eventSource.addEventListener("data", (event: MessageEvent) => {
65-
[...this.messageWrappers.values()].forEach((wrapper) => wrapper(event));
73+
this.invokeCallbacks(this.messageWrappers.values(), event, "message");
6674
});
6775

6876
this.eventSource.addEventListener("error", (error: Event | ErrorEvent) => {
69-
this.callbacks.error.forEach((cb) => cb(this.createErrorEvent(error)));
77+
this.invokeCallbacks(
78+
this.callbacks.error,
79+
this.createErrorEvent(error),
80+
"error",
81+
);
7082

7183
if (this.eventSource.readyState === EventSource.CLOSED) {
72-
this.callbacks.close.forEach((cb) =>
73-
cb({
84+
this.invokeCallbacks(
85+
this.callbacks.close,
86+
{
7487
code: 1006,
7588
reason: "Connection lost",
7689
wasClean: false,
77-
} as WsCloseEvent),
90+
} as WsCloseEvent,
91+
"close",
7892
);
7993
}
8094
});
8195
}
8296

97+
private invokeCallbacks<T>(
98+
callbacks: Iterable<(event: T) => void>,
99+
event: T,
100+
eventType: string,
101+
): void {
102+
for (const cb of callbacks) {
103+
try {
104+
cb(event);
105+
} catch (err) {
106+
this.logger.error(`Error in SSE ${eventType} callback:`, err);
107+
}
108+
}
109+
}
110+
83111
private createErrorEvent(event: Event | ErrorEvent): WsErrorEvent {
84112
const errorMessage =
85113
event instanceof ErrorEvent && event.message
@@ -177,12 +205,14 @@ export class SseConnection implements UnidirectionalStream<ServerSentEvent> {
177205

178206
public close(code?: number, reason?: string): void {
179207
this.eventSource.close();
180-
this.callbacks.close.forEach((cb) =>
181-
cb({
208+
this.invokeCallbacks(
209+
this.callbacks.close,
210+
{
182211
code: code ?? 1000,
183212
reason: reason ?? "Normal closure",
184213
wasClean: true,
185-
} as WsCloseEvent),
214+
} as WsCloseEvent,
215+
"close",
186216
);
187217

188218
Object.values(this.callbacks).forEach((callbackSet) => callbackSet.clear());

0 commit comments

Comments
 (0)