diff --git a/packages/scout-agent/agent.ts b/packages/scout-agent/agent.ts index d68abde..42d278a 100644 --- a/packages/scout-agent/agent.ts +++ b/packages/scout-agent/agent.ts @@ -64,8 +64,7 @@ agent.on("chat", async ({ id, messages }) => { }), }, }); - const stream = streamText(params); - return scout.processStreamTextOutput(stream); + return streamText(params); }); agent.serve(); diff --git a/packages/scout-agent/lib/compaction.test.ts b/packages/scout-agent/lib/compaction.test.ts index d5cd4d9..57138fe 100644 --- a/packages/scout-agent/lib/compaction.test.ts +++ b/packages/scout-agent/lib/compaction.test.ts @@ -1,16 +1,20 @@ /** biome-ignore-all lint/style/noNonNullAssertion: fine for tests */ /** biome-ignore-all lint/suspicious/noExplicitAny: fine for tests */ import { describe, expect, test } from "bun:test"; +import { APICallError } from "ai"; import { applyCompactionToMessages, buildCompactionRequestMessage, COMPACT_CONVERSATION_TOOL_NAME, COMPACTION_MARKER_TOOL_NAME, countCompactionMarkers, + maxConsecutiveCompactionAttempts, createCompactionMarkerPart, createCompactionTool, + findAPICallError, findCompactionSummary, isOutOfContextError, + MAX_CONSECUTIVE_COMPACTION_ATTEMPTS, } from "./compaction"; import type { Message } from "./types"; @@ -55,62 +59,77 @@ function summaryMsg( } describe("isOutOfContextError", () => { - test("returns true for Anthropic context limit errors", () => { - expect(isOutOfContextError(new Error("max_tokens_exceeded"))).toBe(true); + const createApiError = (message: string) => + new APICallError({ + message, + url: "https://api.example.com", + requestBodyValues: {}, + statusCode: 400, + }); + + test("returns true for APICallError with context limit message", () => { expect( - isOutOfContextError(new Error("The context window has been exceeded")) + isOutOfContextError( + createApiError("Input is too long for requested model") + ) ).toBe(true); - }); - - test("returns true for OpenAI context_length_exceeded errors", () => { - expect(isOutOfContextError(new Error("context_length_exceeded"))).toBe( + expect(isOutOfContextError(createApiError("context_length_exceeded"))).toBe( true ); }); - test("returns true for generic token limit exceeded messages", () => { - expect(isOutOfContextError(new Error("token limit exceeded"))).toBe(true); - expect( - isOutOfContextError(new Error("Token limit has been exceeded")) - ).toBe(true); - expect(isOutOfContextError(new Error("maximum tokens reached"))).toBe(true); + test("returns true for APICallError in cause chain", () => { + const apiError = createApiError("max_tokens_exceeded"); + const wrapper = new Error("Gateway error"); + (wrapper as { cause?: unknown }).cause = apiError; + expect(isOutOfContextError(wrapper)).toBe(true); }); - test("returns true for context window errors", () => { - expect(isOutOfContextError(new Error("context window exceeded"))).toBe( - true - ); - expect(isOutOfContextError(new Error("context length exceeded"))).toBe( - true + test("returns false for APICallError with unrelated message", () => { + expect(isOutOfContextError(createApiError("authentication failed"))).toBe( + false ); }); - test("returns true for input too long errors", () => { - expect(isOutOfContextError(new Error("input is too long"))).toBe(true); - expect(isOutOfContextError(new Error("prompt is too long"))).toBe(true); + test("returns false for non-APICallError even if message matches pattern", () => { + expect(isOutOfContextError(new Error("context_length_exceeded"))).toBe( + false + ); + expect(isOutOfContextError("input too long")).toBe(false); }); +}); + +describe("findAPICallError", () => { + const createApiError = (message: string) => + new APICallError({ + message, + url: "https://api.example.com", + requestBodyValues: {}, + statusCode: 400, + }); - test("returns false for unrelated errors", () => { - expect(isOutOfContextError(new Error("network error"))).toBe(false); - expect(isOutOfContextError(new Error("authentication failed"))).toBe(false); - expect(isOutOfContextError(new Error("rate limit exceeded"))).toBe(false); + test("returns the APICallError when provided directly", () => { + const error = createApiError("test"); + expect(findAPICallError(error)).toBe(error); }); - test("handles string messages", () => { - expect(isOutOfContextError("token limit exceeded")).toBe(true); - expect(isOutOfContextError("some other error")).toBe(false); + test("returns APICallError from single-level cause", () => { + const apiError = createApiError("test"); + const wrapper = new Error("wrapper"); + (wrapper as { cause?: unknown }).cause = apiError; + expect(findAPICallError(wrapper)).toBe(apiError); }); - test("handles objects with message property", () => { - expect(isOutOfContextError({ message: "token limit exceeded" })).toBe(true); - expect(isOutOfContextError({ message: "some other error" })).toBe(false); + test("returns APICallError from deep cause chain", () => { + const apiError = createApiError("test"); + const wrapper = { cause: { cause: apiError } }; + expect(findAPICallError(wrapper)).toBe(apiError); }); - test("returns false for non-error values", () => { - expect(isOutOfContextError(null)).toBe(false); - expect(isOutOfContextError(undefined)).toBe(false); - expect(isOutOfContextError(123)).toBe(false); - expect(isOutOfContextError({})).toBe(false); + test("returns null when no APICallError present", () => { + expect(findAPICallError(new Error("other"))).toBeNull(); + expect(findAPICallError("string")).toBeNull(); + expect(findAPICallError(null)).toBeNull(); }); }); @@ -285,6 +304,37 @@ describe("countCompactionMarkers", () => { }); }); +describe("maxConsecutiveCompactionAttempts", () => { + test("counts consecutive assistant compaction attempts", () => { + const messages: Message[] = [ + userMsg("1", "Hello"), + summaryMsg("summary-1", "Summary output 1"), + summaryMsg("summary-2", "Summary output 2"), + ]; + + expect(maxConsecutiveCompactionAttempts(messages)).toBe(2); + }); + + test("does not count non-consecutive compaction attempts", () => { + const messages: Message[] = [ + summaryMsg("summary-1", "First summary"), + userMsg("1", "Hello"), + summaryMsg("summary-2", "Second summary"), + ]; + + expect(maxConsecutiveCompactionAttempts(messages)).toBe(1); + }); + + test("stops at non-compaction assistant message", () => { + const messages: Message[] = [ + markerMsg("marker1"), + assistantMsg("assistant", "Normal reply"), + ]; + + expect(maxConsecutiveCompactionAttempts(messages)).toBe(0); + }); +}); + describe("buildCompactionRequestMessage", () => { test("creates user message with correct role", () => { const message = buildCompactionRequestMessage(); @@ -311,6 +361,20 @@ describe("applyCompactionToMessages", () => { expect(result).toEqual(messages); }); + test("throws when consecutive compaction attempts hit the limit", () => { + const attempts = MAX_CONSECUTIVE_COMPACTION_ATTEMPTS + 1; + const messages: Message[] = [ + userMsg("1", "Hello"), + ...Array.from({ length: attempts }, (_, idx) => + summaryMsg(`summary-${idx}`, `Summary ${idx}`) + ), + ]; + + expect(() => applyCompactionToMessages(messages)).toThrow( + /Compaction loop detected/ + ); + }); + test("excludes correct number of messages based on marker count", () => { const messages: Message[] = [ userMsg("1", "Message 1"), @@ -582,6 +646,10 @@ describe("applyCompactionToMessages", () => { userMsg("3", "Third message"), userMsg("4", "Fourth message"), markerMsg("marker1"), + assistantMsg( + "assistant-buffer", + "Normal reply between compaction attempts" + ), markerMsg("marker2"), userMsg("interrupted", "User interrupted compaction with this message"), markerMsg("marker3"), diff --git a/packages/scout-agent/lib/compaction.ts b/packages/scout-agent/lib/compaction.ts index 88d7d16..d849fb1 100644 --- a/packages/scout-agent/lib/compaction.ts +++ b/packages/scout-agent/lib/compaction.ts @@ -1,10 +1,18 @@ -import { type Tool, tool } from "ai"; +import { + APICallError, + type StreamTextTransform, + type TextStreamPart, + type Tool, + type ToolSet, + tool, +} from "ai"; import { z } from "zod"; import type { Message } from "./types"; // Constants export const COMPACTION_MARKER_TOOL_NAME = "__compaction_marker"; export const COMPACT_CONVERSATION_TOOL_NAME = "compact_conversation"; +export const MAX_CONSECUTIVE_COMPACTION_ATTEMPTS = 5; // Error patterns for out-of-context detection (regex) const OUT_OF_CONTEXT_PATTERNS = [ @@ -20,28 +28,77 @@ const OUT_OF_CONTEXT_PATTERNS = [ /maximum.*tokens/i, ]; +/** + * Recursively search for an APICallError in the error's cause chain. + */ +export function findAPICallError(error: unknown): APICallError | null { + if (APICallError.isInstance(error)) { + return error; + } + if (error && typeof error === "object" && "cause" in error) { + const cause = (error as { cause?: unknown }).cause; + return findAPICallError(cause); + } + return null; +} + /** * Check if an error is an out-of-context error based on known patterns. */ export function isOutOfContextError(error: unknown): boolean { - let message: string; - - if (error instanceof Error) { - message = error.message; - } else if (typeof error === "string") { - message = error; - } else if ( - error !== null && - typeof error === "object" && - "message" in error && - typeof error.message === "string" - ) { - message = error.message; - } else { + const apiError = findAPICallError(error); + if (!apiError) { return false; } + return OUT_OF_CONTEXT_PATTERNS.some((pattern) => + pattern.test(apiError.message) + ); +} - return OUT_OF_CONTEXT_PATTERNS.some((pattern) => pattern.test(message)); +/** + * Creates a stream transform that detects out-of-context errors and emits a compaction marker. + */ +export function createCompactionTransform( + onCompactionTriggered?: () => void +): StreamTextTransform { + return ({ stopStream }) => + new TransformStream, TextStreamPart>({ + transform(chunk, controller) { + if ( + chunk?.type === "error" && + isOutOfContextError((chunk as { error?: unknown }).error) + ) { + onCompactionTriggered?.(); + const markerPart = createCompactionMarkerPart(); + controller.enqueue({ + type: "tool-call", + toolCallType: "function", + toolCallId: markerPart.toolCallId, + toolName: markerPart.toolName, + input: markerPart.input, + dynamic: true, + } as TextStreamPart); + controller.enqueue({ + type: "tool-result", + toolCallId: markerPart.toolCallId, + toolName: markerPart.toolName, + input: markerPart.input, + output: markerPart.output, + providerExecuted: false, + dynamic: true, + } as TextStreamPart); + controller.enqueue({ + type: "finish", + finishReason: "tool-calls", + logprobs: undefined, + totalUsage: { inputTokens: 0, outputTokens: 0, totalTokens: 0 }, + } as TextStreamPart); + stopStream(); + return; + } + controller.enqueue(chunk); + }, + }); } /** @@ -109,6 +166,15 @@ function isCompactionSummaryPart(part: Message["parts"][number]): boolean { ); } +function isCompactConversationPart(part: Message["parts"][number]): boolean { + return ( + part.type === `tool-${COMPACT_CONVERSATION_TOOL_NAME}` || + (part.type === "dynamic-tool" && + "toolName" in part && + part.toolName === COMPACT_CONVERSATION_TOOL_NAME) + ); +} + export interface CompactionMarkerPart { type: "dynamic-tool"; toolName: typeof COMPACTION_MARKER_TOOL_NAME; @@ -203,6 +269,38 @@ export function countCompactionMarkers( return count; } +/** + * Finds the maximum number of consecutive assistant messages that contain + * compaction tool calls. The streak resets when a non-assistant message + * is encountered. + * + * @param messages - The message history to analyze + * @returns The longest streak of consecutive compaction attempts + */ +export function maxConsecutiveCompactionAttempts(messages: Message[]): number { + let maxAttempts = 0; + let attempts = 0; + + for (let i = messages.length - 1; i >= 0; i--) { + const message = messages[i]; + if (!message) { + continue; + } + if (message.role !== "assistant") { + attempts = 0; + } + const hasCompactionPart = message.parts.some((part) => + isCompactConversationPart(part) + ); + if (hasCompactionPart) { + attempts++; + maxAttempts = Math.max(maxAttempts, attempts); + } + } + + return maxAttempts; +} + /** * Build the compaction request message that instructs the model to compact. */ @@ -371,7 +469,7 @@ function transformMessagesForCompaction(messages: Message[]): Message[] { * would be excluded, throws `CompactionError`. * * ## Flow example - * 1. Model hits context limit → `processStreamTextOutput` emits compaction marker + * 1. Model hits context limit → compaction transform emits compaction marker * 2. Next iteration calls this function * 3. Messages are truncated + compaction request appended * 4. Model calls `compact_conversation` with summary @@ -382,6 +480,14 @@ function transformMessagesForCompaction(messages: Message[]): Message[] { * @throws {CompactionError} If compaction would leave no messages (too many retries) */ export function applyCompactionToMessages(messages: Message[]): Message[] { + const compactionAttempts = maxConsecutiveCompactionAttempts(messages); + if (compactionAttempts >= MAX_CONSECUTIVE_COMPACTION_ATTEMPTS) { + throw new CompactionError( + `Compaction loop detected after ${compactionAttempts} attempts`, + compactionAttempts + ); + } + const currentConversation = applySummaryToMessages(messages); const transformedMessages = transformMessagesForCompaction(currentConversation); diff --git a/packages/scout-agent/lib/core.test.ts b/packages/scout-agent/lib/core.test.ts index 0946481..65b192c 100644 --- a/packages/scout-agent/lib/core.test.ts +++ b/packages/scout-agent/lib/core.test.ts @@ -1,5 +1,6 @@ import { describe, expect, mock, test } from "bun:test"; import { + APICallError, readUIMessageStream, simulateReadableStream, streamText, @@ -992,7 +993,15 @@ describe("coder integration", () => { describe("compaction", () => { // Shared helpers for compaction tests - const CONTEXT_LENGTH_ERROR = "context_length_exceeded"; + const createContextApiError = ( + message = "Input is too long for requested model" + ) => + new APICallError({ + message, + url: "https://api.example.com", + requestBodyValues: {}, + statusCode: 400, + }); /** Check if a message contains the compaction marker */ const hasCompactionMarker = (msg: UIMessage) => @@ -1011,6 +1020,24 @@ describe("compaction", () => { (p.type === "dynamic-tool" && p.toolName === "compact_conversation") ); + /** Create a mock response that emits an out-of-context APICallError */ + const createContextErrorResponse = () => ({ + stream: simulateReadableStream({ + chunks: [{ type: "error" as const, error: createContextApiError() }], + }), + }); + + /** Create a mock response that emits text before an out-of-context error */ + const createMidStreamContextErrorResponse = () => ({ + stream: simulateReadableStream({ + chunks: [ + { type: "text-start" as const, id: "text-1" }, + { type: "text-delta" as const, id: "text-1", delta: "partial text" }, + { type: "error" as const, error: createContextApiError() }, + ], + }), + }); + /** Create a mock response that calls the compact_conversation tool */ const createCompactToolResponse = ( summary: string, @@ -1068,13 +1095,11 @@ describe("compaction", () => { messages, model, }); - return scout.processStreamTextOutput( - streamText({ - ...params, - // by default, streamText prints all errors to console.error, which is noisy in tests - onError: () => {}, - }) - ); + return streamText({ + ...params, + // by default, streamText prints all errors to console.error, which is noisy in tests + onError: () => {}, + }); }); return { agent, scout, chatID }; }; @@ -1103,6 +1128,24 @@ describe("compaction", () => { return textPart ? (textPart as { text: string }).text : undefined; }; + const createCompactionSummaryMessage = (id: string): Message => ({ + id, + role: "assistant", + parts: [ + { + type: "dynamic-tool", + toolName: "compact_conversation", + toolCallId: `${id}-call`, + state: "output-available", + input: { summary: "Test summary" }, + output: { + summary: "Test summary", + compacted_at: "2024-01-01T00:00:00Z", + }, + } as Message["parts"][number], + ], + }); + test("buildStreamTextParams always includes compact_conversation tool by default", async () => { const agent = new blink.Agent(); const scout = new Scout({ @@ -1138,11 +1181,46 @@ describe("compaction", () => { expect(params.tools.compact_conversation).toBeUndefined(); }); - test("buildStreamTextParams throws when exclusion would leave insufficient messages", async () => { + test("buildStreamTextParams disables compaction after repeated compaction attempts", async () => { + const warn = mock(); + const logger = { ...noopLogger, warn }; const agent = new blink.Agent(); const scout = new Scout({ agent, - logger: noopLogger, + logger, + }); + + const messages: Message[] = [ + { + id: "user-1", + role: "user", + parts: [{ type: "text", text: "Hello" }], + }, + createCompactionSummaryMessage("summary-1"), + createCompactionSummaryMessage("summary-2"), + createCompactionSummaryMessage("summary-3"), + createCompactionSummaryMessage("summary-4"), + createCompactionSummaryMessage("summary-5"), + ]; + + const params = await scout.buildStreamTextParams({ + chatID: "test-chat-id" as blink.ID, + messages, + model: newMockModel({ textResponse: "test" }), + }); + + expect(params.tools.compact_conversation).toBeUndefined(); + expect(params.experimental_transform).toBeUndefined(); + expect(warn).toHaveBeenCalled(); + }); + + test("buildStreamTextParams disables compaction when exclusion would leave insufficient messages", async () => { + const warn = mock(); + const logger = { ...noopLogger, warn }; + const agent = new blink.Agent(); + const scout = new Scout({ + agent, + logger, }); // Create messages with insufficient content to summarize after exclusion @@ -1171,48 +1249,15 @@ describe("compaction", () => { }, ]; - await expect( - scout.buildStreamTextParams({ - chatID: "test-chat-id" as blink.ID, - messages, - model: newMockModel({ textResponse: "test" }), - }) - ).rejects.toThrow(/Cannot compact/); - }); - - test("processStreamTextOutput passes through normal stream unchanged", async () => { - const agent = new blink.Agent(); - const scout = new Scout({ - agent, - logger: noopLogger, - }); - const params = await scout.buildStreamTextParams({ chatID: "test-chat-id" as blink.ID, - messages: [ - { - id: "user-1", - role: "user", - parts: [{ type: "text", text: "Hello" }], - }, - ], - model: newMockModel({ textResponse: "Hello World" }), - compaction: false, + messages, + model: newMockModel({ textResponse: "test" }), }); - const stream = streamText(params); - const processedStream = scout.processStreamTextOutput(stream); - - const collectedChunks: { type: string }[] = []; - for await (const chunk of processedStream.fullStream) { - collectedChunks.push(chunk as { type: string }); - } - - // Should have text chunks and finish - expect(collectedChunks.some((c) => c.type === "text-delta")).toBe(true); - expect(collectedChunks.some((c) => c.type === "finish")).toBe(true); - // Should NOT have any compaction markers - expect(collectedChunks.some((c) => c.type === "tool-result")).toBe(false); + expect(params.tools.compact_conversation).toBeUndefined(); + expect(params.experimental_transform).toBeUndefined(); + expect(warn).toHaveBeenCalled(); }); test("e2e: complete compaction flow using scout methods directly", async () => { @@ -1222,7 +1267,7 @@ describe("compaction", () => { const model = new MockLanguageModelV2({ doStream: async () => { modelCallCount++; - if (modelCallCount === 1) throw new Error(CONTEXT_LENGTH_ERROR); + if (modelCallCount === 1) return createContextErrorResponse(); if (modelCallCount === 2) return createCompactToolResponse( "Previous conversation summary from model." @@ -1294,7 +1339,7 @@ describe("compaction", () => { const model = new MockLanguageModelV2({ doStream: async () => { modelCallCount++; - if (modelCallCount <= 2) throw new Error(CONTEXT_LENGTH_ERROR); + if (modelCallCount <= 2) return createContextErrorResponse(); if (modelCallCount === 3) return createCompactToolResponse("Summary of the old conversation."); return createTextResponse("Response after compaction"); @@ -1384,7 +1429,7 @@ describe("compaction", () => { const model = new MockLanguageModelV2({ doStream: async () => { modelCallCount++; - if (modelCallCount === 1) throw new Error(CONTEXT_LENGTH_ERROR); + if (modelCallCount === 1) return createContextErrorResponse(); if (modelCallCount === 2) throw new Error("network_error: connection refused"); if (modelCallCount === 3) @@ -1453,7 +1498,7 @@ describe("compaction", () => { const model = new MockLanguageModelV2({ doStream: async () => { modelCallCount++; - if (modelCallCount === 1) throw new Error(CONTEXT_LENGTH_ERROR); + if (modelCallCount === 1) return createContextErrorResponse(); if (modelCallCount === 2) return createCompactToolResponse("Error recovery summary."); return createTextResponse("Success after error recovery"); @@ -1507,19 +1552,7 @@ describe("compaction", () => { modelCallCount++; if (modelCallCount === 1) { // Stream that emits some chunks, then errors mid-stream - return { - stream: new ReadableStream({ - start(controller) { - controller.enqueue({ type: "text-start", id: "text-1" }); - controller.enqueue({ - type: "text-delta", - id: "text-1", - delta: "Starting to respond...", - }); - controller.error(new Error(CONTEXT_LENGTH_ERROR)); - }, - }), - }; + return createMidStreamContextErrorResponse(); } if (modelCallCount === 2) return createCompactToolResponse( @@ -1585,7 +1618,7 @@ describe("compaction", () => { capturedMessages.push(messageContents); // Cycle 1: calls 1-3 - if (modelCallCount === 1) throw new Error(CONTEXT_LENGTH_ERROR); + if (modelCallCount === 1) return createContextErrorResponse(); if (modelCallCount === 2) return createCompactToolResponse( "First compaction summary from cycle 1." @@ -1593,7 +1626,7 @@ describe("compaction", () => { if (modelCallCount === 3) return createTextResponse("First cycle complete"); // Cycle 2: calls 4-6 - if (modelCallCount === 4) throw new Error(CONTEXT_LENGTH_ERROR); + if (modelCallCount === 4) return createContextErrorResponse(); if (modelCallCount === 5) return createCompactToolResponse( "Second compaction summary from cycle 2.", @@ -1686,7 +1719,7 @@ describe("compaction", () => { const model = new MockLanguageModelV2({ doStream: async () => { modelCallCount++; - if (modelCallCount === 1) throw new Error(CONTEXT_LENGTH_ERROR); + if (modelCallCount === 1) return createContextErrorResponse(); if (modelCallCount === 2) return createCompactToolResponse("Summary of conversation so far."); return createTextResponse("Final response"); diff --git a/packages/scout-agent/lib/core.ts b/packages/scout-agent/lib/core.ts index d2ed4d0..12b9dda 100644 --- a/packages/scout-agent/lib/core.ts +++ b/packages/scout-agent/lib/core.ts @@ -4,13 +4,19 @@ import type * as github from "@blink-sdk/github"; import withModelIntent from "@blink-sdk/model-intent"; import * as slack from "@blink-sdk/slack"; import type { App } from "@slack/bolt"; -import { convertToModelMessages, type LanguageModel, type Tool } from "ai"; +import { + convertToModelMessages, + type LanguageModel, + type StreamTextTransform, + type Tool, + type ToolSet, +} from "ai"; import type * as blink from "blink"; import { applyCompactionToMessages, - createCompactionMarkerPart, + CompactionError, + createCompactionTransform, createCompactionTool, - isOutOfContextError, } from "./compaction"; import { type CoderApiClient, @@ -40,7 +46,8 @@ import { createSlackApp, createSlackTools, getSlackMetadata } from "./slack"; import type { Message } from "./types"; import { createWebSearchTools } from "./web-search"; -type Tools = Partial> & +type Tools = ToolSet & + Partial> & Partial> & Record; @@ -345,6 +352,7 @@ export class Scout { maxOutputTokens: number; providerOptions?: ProviderOptions; tools: Tools; + experimental_transform?: StreamTextTransform; }> { this.printConfigWarnings(); @@ -462,6 +470,25 @@ export class Scout { } } + let compactionEnabled = compaction; + let messagesToConvert = messages; + if (compactionEnabled) { + try { + messagesToConvert = applyCompactionToMessages(messages); + } catch (error) { + if (error instanceof CompactionError) { + this.logger.warn( + "Disabling compaction due to repeated compaction failures", + error + ); + compactionEnabled = false; + messagesToConvert = messages; + } else { + throw error; + } + } + } + const tools = { ...(this.webSearch.config ? createWebSearchTools({ exaApiKey: this.webSearch.config.exaApiKey }) @@ -478,7 +505,7 @@ export class Scout { : undefined), ...computeTools, // Always include compaction tool when compaction is enabled (for caching purposes) - ...(compaction ? createCompactionTool() : {}), + ...(compactionEnabled ? createCompactionTool() : {}), ...providedTools, }; @@ -491,10 +518,6 @@ ${slack.formattingRules} `; } - const messagesToConvert = compaction - ? applyCompactionToMessages(messages) - : messages; - const converted = convertToModelMessages(messagesToConvert, { ignoreIncompleteToolCalls: true, tools, @@ -518,109 +541,9 @@ ${slack.formattingRules} maxOutputTokens: 64_000, providerOptions, tools: withModelIntent(tools), + experimental_transform: compactionEnabled + ? createCompactionTransform() + : undefined, }; } - - /** - * Process the output from streamText, intercepting out-of-context errors - * and replacing them with compaction markers. - * - * @param stream - The StreamTextResult from the AI SDK's streamText() - * @param options - Optional callbacks - * @returns The same stream, but with toUIMessageStream wrapped to handle errors - */ - processStreamTextOutput< - // biome-ignore lint/suspicious/noExplicitAny: toUIMessageStream has complex overloaded signature - T extends { toUIMessageStream: (...args: any[]) => any }, - >( - stream: T, - options?: { - onCompactionTriggered?: () => void; - } - ): T { - // Use a Proxy to wrap toUIMessageStream - return new Proxy(stream, { - get(target, prop) { - // Wrap toUIMessageStream to intercept out-of-context errors - if (prop === "toUIMessageStream") { - const originalMethod = target.toUIMessageStream; - return (...args: unknown[]) => { - const uiStream = originalMethod.apply(target, args); - - // Helper to emit compaction marker chunks - const emitCompactionMarker = ( - controller: ReadableStreamDefaultController - ) => { - options?.onCompactionTriggered?.(); - const markerPart = createCompactionMarkerPart(); - controller.enqueue({ - type: "tool-input-start", - toolCallId: markerPart.toolCallId, - toolName: markerPart.toolName, - }); - controller.enqueue({ - type: "tool-input-available", - toolCallId: markerPart.toolCallId, - toolName: markerPart.toolName, - input: markerPart.input, - }); - controller.enqueue({ - type: "tool-output-available", - toolCallId: markerPart.toolCallId, - output: markerPart.output, - preliminary: false, - }); - }; - - // Use a custom ReadableStream to handle both error chunks and mid-stream errors - // This approach catches errors from controller.error() which TransformStream doesn't handle - return new ReadableStream({ - async start(controller) { - const reader = uiStream.getReader(); - try { - while (true) { - const { done, value: chunk } = await reader.read(); - if (done) break; - - // Check if this is an error chunk in UI format - if ( - chunk && - typeof chunk === "object" && - "type" in chunk && - chunk.type === "error" && - "errorText" in chunk && - typeof chunk.errorText === "string" && - isOutOfContextError(new Error(chunk.errorText)) - ) { - emitCompactionMarker(controller); - continue; - } - controller.enqueue(chunk); - } - controller.close(); - } catch (error) { - // Mid-stream error via controller.error() - check if it's out of context - if (isOutOfContextError(error)) { - emitCompactionMarker(controller); - controller.close(); - } else { - controller.error(error); - } - } finally { - reader.releaseLock(); - } - }, - }); - }; - } - - const value = target[prop as keyof T]; - // Bind functions to the original target to preserve 'this' context - if (typeof value === "function") { - return value.bind(target); - } - return value; - }, - }) as T; - } } diff --git a/packages/scout-agent/package.json b/packages/scout-agent/package.json index 1c4ff8d..ec47668 100644 --- a/packages/scout-agent/package.json +++ b/packages/scout-agent/package.json @@ -1,7 +1,7 @@ { "name": "@blink-sdk/scout-agent", "description": "A general-purpose AI agent with GitHub, Slack, web search, and compute capabilities built on Blink SDK.", - "version": "0.0.11", + "version": "0.0.12", "type": "module", "keywords": [ "blink",