Skip to content

Commit 07adc3e

Browse files
committed
Add SSE Connection and streamingFetchAdapter tests
1 parent f9b1f25 commit 07adc3e

File tree

3 files changed

+536
-5
lines changed

3 files changed

+536
-5
lines changed

src/websocket/sseConnection.ts

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -109,11 +109,10 @@ export class SseConnection implements UnidirectionalStream<ServerSentEvent> {
109109
}
110110

111111
private createErrorEvent(event: Event | ErrorEvent): WsErrorEvent {
112-
const errorMessage =
113-
event instanceof ErrorEvent && event.message
114-
? event.message
115-
: "SSE connection error";
116-
const error = event instanceof ErrorEvent ? event.error : undefined;
112+
// Check for properties instead of instanceof to avoid browser-only ErrorEvent global
113+
const eventWithMessage = event as { message?: string; error?: unknown };
114+
const errorMessage = eventWithMessage.message || "SSE connection error";
115+
const error = eventWithMessage.error;
117116

118117
return {
119118
error: error,
Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
import { type AxiosInstance } from "axios";
2+
import { EventEmitter } from "events";
3+
import { type ReaderLike } from "eventsource";
4+
import { type IncomingMessage } from "http";
5+
import { describe, it, expect, vi, beforeEach } from "vitest";
6+
7+
import { createStreamingFetchAdapter } from "@/api/streamingFetchAdapter";
8+
9+
const TEST_URL = "https://example.com/api";
10+
11+
describe("createStreamingFetchAdapter", () => {
12+
let mockAxios: AxiosInstance;
13+
14+
beforeEach(() => {
15+
vi.resetAllMocks();
16+
mockAxios = {
17+
request: vi.fn(),
18+
} as unknown as AxiosInstance;
19+
});
20+
21+
describe("Request Handling", () => {
22+
it("passes URL, signal, and responseType to axios", async () => {
23+
const mockStream = createMockStream();
24+
setupAxiosResponse(mockAxios, 200, {}, mockStream);
25+
26+
const adapter = createStreamingFetchAdapter(mockAxios);
27+
const signal = new AbortController().signal;
28+
29+
await adapter(TEST_URL, { signal });
30+
31+
expect(mockAxios.request).toHaveBeenCalledWith({
32+
url: TEST_URL,
33+
signal, // correctly passes signal
34+
headers: {},
35+
responseType: "stream",
36+
validateStatus: expect.any(Function),
37+
});
38+
});
39+
40+
it("applies headers in correct precedence order (config > init)", async () => {
41+
const mockStream = createMockStream();
42+
setupAxiosResponse(mockAxios, 200, {}, mockStream);
43+
44+
// Test 1: No config headers, only init headers
45+
const adapter1 = createStreamingFetchAdapter(mockAxios);
46+
await adapter1(TEST_URL, {
47+
headers: { "X-Init": "init-value" },
48+
});
49+
50+
expect(mockAxios.request).toHaveBeenCalledWith(
51+
expect.objectContaining({
52+
headers: { "X-Init": "init-value" },
53+
}),
54+
);
55+
56+
// Test 2: Config headers merge with init headers
57+
const adapter2 = createStreamingFetchAdapter(mockAxios, {
58+
"X-Config": "config-value",
59+
});
60+
await adapter2(TEST_URL, {
61+
headers: { "X-Init": "init-value" },
62+
});
63+
64+
expect(mockAxios.request).toHaveBeenCalledWith(
65+
expect.objectContaining({
66+
headers: {
67+
"X-Init": "init-value",
68+
"X-Config": "config-value",
69+
},
70+
}),
71+
);
72+
73+
// Test 3: Config headers override init headers
74+
const adapter3 = createStreamingFetchAdapter(mockAxios, {
75+
"X-Header": "config-value",
76+
});
77+
await adapter3(TEST_URL, {
78+
headers: { "X-Header": "init-value" },
79+
});
80+
81+
expect(mockAxios.request).toHaveBeenCalledWith(
82+
expect.objectContaining({
83+
headers: { "X-Header": "config-value" },
84+
}),
85+
);
86+
});
87+
});
88+
89+
describe("Response Properties", () => {
90+
it("returns response with correct properties", async () => {
91+
const mockStream = createMockStream();
92+
setupAxiosResponse(
93+
mockAxios,
94+
200,
95+
{ "content-type": "text/event-stream" },
96+
mockStream,
97+
);
98+
99+
const adapter = createStreamingFetchAdapter(mockAxios);
100+
const response = await adapter(TEST_URL);
101+
102+
expect(response.url).toBe(TEST_URL);
103+
expect(response.status).toBe(200);
104+
expect(response.headers.get("content-type")).toBe("text/event-stream");
105+
expect(response.headers.get("CoNtEnT-TyPe")).toBe("text/event-stream");
106+
expect(response.body?.getReader).toBeDefined();
107+
});
108+
109+
it("detects redirected requests", async () => {
110+
const mockStream = createMockStream();
111+
const mockResponse = {
112+
status: 200,
113+
headers: {},
114+
data: mockStream,
115+
request: {
116+
res: {
117+
responseUrl: "https://redirect.com/api",
118+
},
119+
},
120+
};
121+
vi.mocked(mockAxios.request).mockResolvedValue(mockResponse);
122+
123+
const adapter = createStreamingFetchAdapter(mockAxios);
124+
const response = await adapter(TEST_URL);
125+
126+
expect(response.redirected).toBe(true);
127+
});
128+
});
129+
130+
describe("Stream Handling", () => {
131+
it("enqueues data chunks from stream", async () => {
132+
const { mockStream, reader } = await setupReaderTest();
133+
134+
const chunk1 = Buffer.from("data1");
135+
const chunk2 = Buffer.from("data2");
136+
mockStream.emit("data", chunk1);
137+
mockStream.emit("data", chunk2);
138+
mockStream.emit("end");
139+
140+
const result1 = await reader.read();
141+
expect(result1.value).toEqual(chunk1);
142+
expect(result1.done).toBe(false);
143+
144+
const result2 = await reader.read();
145+
expect(result2.value).toEqual(chunk2);
146+
expect(result2.done).toBe(false);
147+
148+
const result3 = await reader.read();
149+
// Closed after end
150+
expect(result3.done).toBe(true);
151+
});
152+
153+
it("propagates stream errors", async () => {
154+
const { mockStream, reader } = await setupReaderTest();
155+
156+
const error = new Error("Stream error");
157+
mockStream.emit("error", error);
158+
159+
await expect(reader.read()).rejects.toThrow("Stream error");
160+
});
161+
162+
it("handles errors after stream is closed", async () => {
163+
const { mockStream, reader } = await setupReaderTest();
164+
165+
mockStream.emit("end");
166+
await reader.read();
167+
168+
// Emit events after stream is closed - should not throw
169+
expect(() => mockStream.emit("data", Buffer.from("late"))).not.toThrow();
170+
expect(() => mockStream.emit("end")).not.toThrow();
171+
});
172+
173+
it("destroys stream on cancel", async () => {
174+
const { mockStream, reader } = await setupReaderTest();
175+
176+
await reader.cancel();
177+
178+
expect(mockStream.destroy).toHaveBeenCalled();
179+
});
180+
});
181+
182+
async function setupReaderTest(): Promise<{
183+
mockStream: IncomingMessage;
184+
reader: ReaderLike | ReadableStreamDefaultReader<Uint8Array<ArrayBuffer>>;
185+
}> {
186+
const mockStream = createMockStream();
187+
setupAxiosResponse(mockAxios, 200, {}, mockStream);
188+
189+
const adapter = createStreamingFetchAdapter(mockAxios);
190+
const response = await adapter(TEST_URL);
191+
const reader = response.body!.getReader();
192+
193+
return { mockStream, reader };
194+
}
195+
});
196+
197+
function createMockStream(): IncomingMessage {
198+
const stream = new EventEmitter() as IncomingMessage;
199+
stream.destroy = vi.fn();
200+
return stream;
201+
}
202+
203+
function setupAxiosResponse(
204+
axios: AxiosInstance,
205+
status: number,
206+
headers: Record<string, string>,
207+
stream: IncomingMessage,
208+
): void {
209+
vi.mocked(axios.request).mockResolvedValue({
210+
status,
211+
headers,
212+
data: stream,
213+
});
214+
}

0 commit comments

Comments
 (0)