diff --git a/src/index.ts b/src/index.ts index 09e2ebf..5121a18 100644 --- a/src/index.ts +++ b/src/index.ts @@ -109,4 +109,6 @@ export { } from './lib/tool-types.js'; // Turn context helpers export { buildTurnContext, normalizeInputToArray } from './lib/turn-context.js'; +// Real-time tool event broadcasting +export { ToolEventBroadcaster } from './lib/tool-event-broadcaster.js'; export * from './sdk/sdk.js'; diff --git a/src/lib/model-result.ts b/src/lib/model-result.ts index b4a153f..911fa65 100644 --- a/src/lib/model-result.ts +++ b/src/lib/model-result.ts @@ -12,6 +12,7 @@ import type { ToolStreamEvent, TurnContext, } from './tool-types.js'; +import { ToolEventBroadcaster } from './tool-event-broadcaster.js'; import { betaResponsesSend } from '../funcs/betaResponsesSend.js'; import { @@ -106,7 +107,11 @@ export class ModelResult { private initPromise: Promise | null = null; private toolExecutionPromise: Promise | null = null; private finalResponse: models.OpenResponsesNonStreamingResponse | null = null; - private preliminaryResults: Map = new Map(); + private toolEventBroadcaster: ToolEventBroadcaster<{ + type: 'preliminary_result'; + toolCallId: string; + result: InferToolEventsUnion; + }> | null = null; private allToolExecutionRounds: Array<{ round: number; toolCalls: ParsedToolCall[]; @@ -120,6 +125,21 @@ export class ModelResult { this.options = options; } + /** + * Get or create the tool event broadcaster (lazy initialization). + * Ensures only one broadcaster exists for the lifetime of this ModelResult. + */ + private ensureBroadcaster(): ToolEventBroadcaster<{ + type: 'preliminary_result'; + toolCallId: string; + result: InferToolEventsUnion; + }> { + if (!this.toolEventBroadcaster) { + this.toolEventBroadcaster = new ToolEventBroadcaster(); + } + return this.toolEventBroadcaster; + } + /** * Type guard to check if a value is a non-streaming response */ @@ -163,8 +183,7 @@ export class ModelResult { // Already resolved, extract non-function fields // Since request is CallModelInput, we need to filter out stopWhen // Note: tools are already in API format at this point (converted in callModel()) - // eslint-disable-next-line @typescript-eslint/no-unused-vars - const { stopWhen, ...rest } = this.options.request; + const { stopWhen: _, ...rest } = this.options.request; // Cast to ResolvedCallModelInput - we know it's resolved if hasAsyncFunctions returned false baseRequest = rest as ResolvedCallModelInput; } @@ -325,12 +344,22 @@ export class ModelResult { continue; } - const result = await executeTool(tool, toolCall, turnContext); - - // Store preliminary results - if (result.preliminaryResults && result.preliminaryResults.length > 0) { - this.preliminaryResults.set(toolCall.id, result.preliminaryResults); - } + // Create callback for real-time preliminary results + const onPreliminaryResult = this.toolEventBroadcaster + ? (callId: string, resultValue: unknown) => { + try { + this.toolEventBroadcaster!.push({ + type: 'preliminary_result' as const, + toolCallId: callId, + result: resultValue as InferToolEventsUnion, + }); + } catch { + // Don't crash tool execution if broadcasting fails + } + } + : undefined; + + const result = await executeTool(tool, toolCall, turnContext, onPreliminaryResult); toolResults.push({ type: 'function_call_output' as const, @@ -480,7 +509,7 @@ export class ModelResult { /** * Stream all response events as they arrive. * Multiple consumers can iterate over this stream concurrently. - * Includes preliminary tool result events after tool execution. + * Preliminary tool results are streamed in REAL-TIME as generator tools yield. */ getFullResponsesStream(): AsyncIterableIterator>> { return async function* (this: ModelResult) { @@ -489,27 +518,34 @@ export class ModelResult { throw new Error('Stream not initialized'); } + // Get or create broadcaster for real-time tool events (lazy init prevents race conditions) + const broadcaster = this.ensureBroadcaster(); + const toolEventConsumer = broadcaster.createConsumer(); + + // Start tool execution in background (completes broadcaster when done) + const executionPromise = this.executeToolsIfNeeded().finally(() => { + broadcaster.complete(); + }); + const consumer = this.reusableStream.createConsumer(); - // Yield original events directly + // Yield original API events for await (const event of consumer) { yield event; } - // After stream completes, check if tools were executed and emit preliminary results - await this.executeToolsIfNeeded(); - - // Emit all preliminary results as new event types - for (const [toolCallId, results] of this.preliminaryResults) { - for (const result of results) { - yield { - type: 'tool.preliminary_result' as const, - toolCallId, - result: result as InferToolEventsUnion, - timestamp: Date.now(), - }; - } + // Yield tool preliminary results as they arrive (real-time!) + for await (const event of toolEventConsumer) { + yield { + type: 'tool.preliminary_result' as const, + toolCallId: event.toolCallId, + result: event.result, + timestamp: Date.now(), + }; } + + // Ensure execution completed (handles errors) + await executionPromise; }.call(this); } @@ -587,7 +623,7 @@ export class ModelResult { /** * Stream tool call argument deltas and preliminary results. - * This filters the full event stream to yield: + * Preliminary results are streamed in REAL-TIME as generator tools yield. * - Tool call argument deltas as { type: "delta", content: string } * - Preliminary results as { type: "preliminary_result", toolCallId, result } */ @@ -598,7 +634,16 @@ export class ModelResult { throw new Error('Stream not initialized'); } - // Yield tool deltas as structured events + // Get or create broadcaster for real-time tool events (lazy init prevents race conditions) + const broadcaster = this.ensureBroadcaster(); + const toolEventConsumer = broadcaster.createConsumer(); + + // Start tool execution in background (completes broadcaster when done) + const executionPromise = this.executeToolsIfNeeded().finally(() => { + broadcaster.complete(); + }); + + // Yield tool deltas from API stream for await (const delta of extractToolDeltas(this.reusableStream)) { yield { type: 'delta' as const, @@ -606,19 +651,13 @@ export class ModelResult { }; } - // After stream completes, check if tools were executed and emit preliminary results - await this.executeToolsIfNeeded(); - - // Emit all preliminary results - for (const [toolCallId, results] of this.preliminaryResults) { - for (const result of results) { - yield { - type: 'preliminary_result' as const, - toolCallId, - result: result as InferToolEventsUnion, - }; - } + // Yield tool events as they arrive (real-time!) + for await (const event of toolEventConsumer) { + yield event; } + + // Ensure execution completed (handles errors) + await executionPromise; }.call(this); } diff --git a/src/lib/tool-event-broadcaster.ts b/src/lib/tool-event-broadcaster.ts new file mode 100644 index 0000000..a0a695b --- /dev/null +++ b/src/lib/tool-event-broadcaster.ts @@ -0,0 +1,168 @@ +/** + * A push-based event broadcaster that supports multiple concurrent consumers. + * Similar to ReusableReadableStream but for push-based events from tool execution. + * + * Each consumer gets their own position in the buffer and receives all events + * from their join point onward. This enables real-time streaming of generator + * tool preliminary results to multiple consumers simultaneously. + * + * @template T - The event type being broadcast + */ +export class ToolEventBroadcaster { + private buffer: T[] = []; + private consumers = new Map(); + private nextConsumerId = 0; + private isComplete = false; + private completionError: Error | null = null; + + /** + * Push a new event to all consumers. + * Events are buffered so late-joining consumers can catch up. + */ + push(event: T): void { + if (this.isComplete) return; + this.buffer.push(event); + this.notifyWaitingConsumers(); + } + + /** + * Mark the broadcaster as complete - no more events will be pushed. + * Optionally pass an error to signal failure to all consumers. + * Cleans up buffer and consumers after completion. + */ + complete(error?: Error): void { + this.isComplete = true; + this.completionError = error ?? null; + this.notifyWaitingConsumers(); + // Schedule cleanup after consumers have processed completion + queueMicrotask(() => this.cleanup()); + } + + /** + * Clean up resources after all consumers have finished. + * Called automatically after complete(), but can be called manually. + */ + private cleanup(): void { + // Only cleanup if complete and all consumers are done + if (this.isComplete && this.consumers.size === 0) { + this.buffer = []; + } + } + + /** + * Create a new consumer that can independently iterate over events. + * Consumers can join at any time and will receive events from position 0. + * Multiple consumers can be created and will all receive the same events. + */ + createConsumer(): AsyncIterableIterator { + const consumerId = this.nextConsumerId++; + const state: ConsumerState = { + position: 0, + waitingPromise: null, + cancelled: false, + }; + this.consumers.set(consumerId, state); + + // eslint-disable-next-line @typescript-eslint/no-this-alias + const self = this; + + return { + async next(): Promise> { + const consumer = self.consumers.get(consumerId); + if (!consumer) { + return { done: true, value: undefined }; + } + + if (consumer.cancelled) { + return { done: true, value: undefined }; + } + + // Return buffered event if available + if (consumer.position < self.buffer.length) { + const value = self.buffer[consumer.position]!; + consumer.position++; + return { done: false, value }; + } + + // If complete and caught up, we're done + if (self.isComplete) { + self.consumers.delete(consumerId); + self.cleanup(); + if (self.completionError) { + throw self.completionError; + } + return { done: true, value: undefined }; + } + + // Set up waiting promise FIRST to avoid race condition + const waitPromise = new Promise((resolve, reject) => { + consumer.waitingPromise = { resolve, reject }; + + // Immediately check if we should resolve after setting up promise + if ( + self.isComplete || + self.completionError || + consumer.position < self.buffer.length + ) { + resolve(); + } + }); + + await waitPromise; + consumer.waitingPromise = null; + + // Recursively try again after waking up + return this.next(); + }, + + async return(): Promise> { + const consumer = self.consumers.get(consumerId); + if (consumer) { + consumer.cancelled = true; + self.consumers.delete(consumerId); + self.cleanup(); + } + return { done: true, value: undefined }; + }, + + async throw(e?: unknown): Promise> { + const consumer = self.consumers.get(consumerId); + if (consumer) { + consumer.cancelled = true; + self.consumers.delete(consumerId); + self.cleanup(); + } + throw e; + }, + + [Symbol.asyncIterator]() { + return this; + }, + }; + } + + /** + * Notify all waiting consumers that new data is available or stream completed + */ + private notifyWaitingConsumers(): void { + for (const consumer of this.consumers.values()) { + if (consumer.waitingPromise) { + if (this.completionError) { + consumer.waitingPromise.reject(this.completionError); + } else { + consumer.waitingPromise.resolve(); + } + consumer.waitingPromise = null; + } + } + } +} + +interface ConsumerState { + position: number; + waitingPromise: { + resolve: () => void; + reject: (error: Error) => void; + } | null; + cancelled: boolean; +} diff --git a/src/lib/tool-executor.ts b/src/lib/tool-executor.ts index 87901d3..7897ed1 100644 --- a/src/lib/tool-executor.ts +++ b/src/lib/tool-executor.ts @@ -14,8 +14,9 @@ import { hasExecuteFunction, isGeneratorTool, isRegularExecuteTool } from './too * Convert a Zod schema to JSON Schema using Zod v4's toJSONSchema function */ export function convertZodToJsonSchema(zodSchema: ZodType): Record { + // Use draft-7 as it's closest to OpenAPI 3.0's JSON Schema variant const jsonSchema = toJSONSchema(zodSchema, { - target: 'openapi-3.0', + target: 'draft-7', }); return jsonSchema; } diff --git a/tests/e2e/call-model-tools.test.ts b/tests/e2e/call-model-tools.test.ts index f0fe324..47f605e 100644 --- a/tests/e2e/call-model-tools.test.ts +++ b/tests/e2e/call-model-tools.test.ts @@ -26,7 +26,7 @@ describe('Enhanced Tool Support for callModel', () => { }); const jsonSchema = toJSONSchema(schema, { - target: 'openapi-3.0', + target: 'draft-7', }); expect(jsonSchema).toHaveProperty('type', 'object'); @@ -48,7 +48,7 @@ describe('Enhanced Tool Support for callModel', () => { }); const jsonSchema = toJSONSchema(schema, { - target: 'openapi-3.0', + target: 'draft-7', }); expect(jsonSchema.properties?.user).toBeDefined(); @@ -61,7 +61,7 @@ describe('Enhanced Tool Support for callModel', () => { }); const jsonSchema = toJSONSchema(schema, { - target: 'openapi-3.0', + target: 'draft-7', }); expect(jsonSchema.properties?.location?.['description']).toBe( diff --git a/tests/e2e/call-model.test.ts b/tests/e2e/call-model.test.ts index e7943b9..ef1ad7c 100644 --- a/tests/e2e/call-model.test.ts +++ b/tests/e2e/call-model.test.ts @@ -160,7 +160,7 @@ describe('callModel E2E Tests', () => { it('should work with chat-style messages and chat-style tools together', async () => { const response = client.callModel({ - model: 'meta-llama/llama-3.1-8b-instruct', + model: 'anthropic/claude-haiku-4.5', input: fromChatMessages([ { role: 'system', diff --git a/tests/unit/tool-event-broadcaster.test.ts b/tests/unit/tool-event-broadcaster.test.ts new file mode 100644 index 0000000..a9fce6d --- /dev/null +++ b/tests/unit/tool-event-broadcaster.test.ts @@ -0,0 +1,300 @@ +import { describe, expect, it } from 'vitest'; +import { ToolEventBroadcaster } from '../../src/lib/tool-event-broadcaster.js'; + +describe('ToolEventBroadcaster', () => { + describe('single consumer', () => { + it('should deliver events to a single consumer', async () => { + const broadcaster = new ToolEventBroadcaster(); + const consumer = broadcaster.createConsumer(); + + broadcaster.push(1); + broadcaster.push(2); + broadcaster.push(3); + broadcaster.complete(); + + const results: number[] = []; + for await (const event of consumer) { + results.push(event); + } + + expect(results).toEqual([1, 2, 3]); + }); + + it('should handle empty stream', async () => { + const broadcaster = new ToolEventBroadcaster(); + const consumer = broadcaster.createConsumer(); + + broadcaster.complete(); + + const results: string[] = []; + for await (const event of consumer) { + results.push(event); + } + + expect(results).toEqual([]); + }); + + it('should handle consumer cancellation via return()', async () => { + const broadcaster = new ToolEventBroadcaster(); + const consumer = broadcaster.createConsumer(); + + broadcaster.push(1); + broadcaster.push(2); + + // Get first value + const first = await consumer.next(); + expect(first.done).toBe(false); + expect(first.value).toBe(1); + + // Cancel consumer + await consumer.return!(); + + // Should be done now + const after = await consumer.next(); + expect(after.done).toBe(true); + }); + }); + + describe('multiple consumers', () => { + it('should deliver same events to multiple consumers', async () => { + const broadcaster = new ToolEventBroadcaster(); + const consumer1 = broadcaster.createConsumer(); + const consumer2 = broadcaster.createConsumer(); + + broadcaster.push('a'); + broadcaster.push('b'); + broadcaster.complete(); + + const results1: string[] = []; + const results2: string[] = []; + + await Promise.all([ + (async () => { + for await (const e of consumer1) results1.push(e); + })(), + (async () => { + for await (const e of consumer2) results2.push(e); + })(), + ]); + + expect(results1).toEqual(['a', 'b']); + expect(results2).toEqual(['a', 'b']); + }); + + it('should allow consumers at different read positions', async () => { + const broadcaster = new ToolEventBroadcaster(); + const consumer1 = broadcaster.createConsumer(); + + broadcaster.push(1); + broadcaster.push(2); + + // Consumer 1 reads first event + const first = await consumer1.next(); + expect(first.value).toBe(1); + + // Consumer 2 joins after events pushed + const consumer2 = broadcaster.createConsumer(); + + broadcaster.push(3); + broadcaster.complete(); + + // Consumer 1 continues from position 1 + const remaining1: number[] = []; + for await (const e of consumer1) remaining1.push(e); + expect(remaining1).toEqual([2, 3]); + + // Consumer 2 gets all events from position 0 + const all2: number[] = []; + for await (const e of consumer2) all2.push(e); + expect(all2).toEqual([1, 2, 3]); + }); + }); + + describe('async waiting', () => { + it('should wait for events when consumer is ahead of buffer', async () => { + const broadcaster = new ToolEventBroadcaster(); + const consumer = broadcaster.createConsumer(); + + // Start consuming before events arrive + const consumePromise = (async () => { + const results: number[] = []; + for await (const event of consumer) { + results.push(event); + } + return results; + })(); + + // Push events after consumer starts waiting + await new Promise((r) => setTimeout(r, 10)); + broadcaster.push(1); + await new Promise((r) => setTimeout(r, 10)); + broadcaster.push(2); + broadcaster.complete(); + + const results = await consumePromise; + expect(results).toEqual([1, 2]); + }); + + it('should handle rapid push/consume interleaving', async () => { + const broadcaster = new ToolEventBroadcaster(); + const consumer = broadcaster.createConsumer(); + + const received: number[] = []; + const consumePromise = (async () => { + for await (const event of consumer) { + received.push(event); + } + })(); + + // Push events with minimal delay + for (let i = 0; i < 10; i++) { + broadcaster.push(i); + await new Promise((r) => setTimeout(r, 1)); + } + broadcaster.complete(); + + await consumePromise; + expect(received).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); + }); + }); + + describe('error handling', () => { + it('should propagate errors to consumers', async () => { + const broadcaster = new ToolEventBroadcaster(); + const consumer = broadcaster.createConsumer(); + + broadcaster.push(1); + broadcaster.complete(new Error('Test error')); + + const results: number[] = []; + let caughtError: Error | null = null; + + try { + for await (const event of consumer) { + results.push(event); + } + } catch (e) { + caughtError = e as Error; + } + + expect(results).toEqual([1]); + expect(caughtError).not.toBeNull(); + expect(caughtError!.message).toBe('Test error'); + }); + + it('should propagate errors to waiting consumers', async () => { + const broadcaster = new ToolEventBroadcaster(); + const consumer = broadcaster.createConsumer(); + + // Start consuming (will wait) + const consumePromise = (async () => { + const results: number[] = []; + for await (const event of consumer) { + results.push(event); + } + return results; + })(); + + // Complete with error while consumer is waiting + await new Promise((r) => setTimeout(r, 10)); + broadcaster.complete(new Error('Async error')); + + await expect(consumePromise).rejects.toThrow('Async error'); + }); + }); + + describe('ignore after complete', () => { + it('should ignore pushes after complete', async () => { + const broadcaster = new ToolEventBroadcaster(); + const consumer = broadcaster.createConsumer(); + + broadcaster.push(1); + broadcaster.complete(); + broadcaster.push(2); // Should be ignored + + const results: number[] = []; + for await (const event of consumer) { + results.push(event); + } + + expect(results).toEqual([1]); + }); + }); + + describe('completion between iterations', () => { + it('should handle completion between consumer iterations', async () => { + const broadcaster = new ToolEventBroadcaster(); + const consumer = broadcaster.createConsumer(); + + broadcaster.push(1); + const first = await consumer.next(); + expect(first.done).toBe(false); + expect(first.value).toBe(1); + + // Complete while consumer is between iterations + broadcaster.complete(); + + const second = await consumer.next(); + expect(second.done).toBe(true); + }); + + it('should handle completion with remaining buffered events', async () => { + const broadcaster = new ToolEventBroadcaster(); + const consumer = broadcaster.createConsumer(); + + broadcaster.push(1); + broadcaster.push(2); + broadcaster.push(3); + + // Read first event + const first = await consumer.next(); + expect(first.value).toBe(1); + + // Complete with events still in buffer + broadcaster.complete(); + + // Should still get remaining buffered events + const second = await consumer.next(); + expect(second.value).toBe(2); + + const third = await consumer.next(); + expect(third.value).toBe(3); + + // Now should be done + const fourth = await consumer.next(); + expect(fourth.done).toBe(true); + }); + }); + + describe('typed events', () => { + it('should work with typed tool events', async () => { + type ToolEvent = + | { type: 'delta'; content: string } + | { type: 'preliminary_result'; toolCallId: string; result: unknown }; + + const broadcaster = new ToolEventBroadcaster(); + const consumer = broadcaster.createConsumer(); + + broadcaster.push({ type: 'delta', content: 'test' }); + broadcaster.push({ + type: 'preliminary_result', + toolCallId: 'call_123', + result: { progress: 50 }, + }); + broadcaster.complete(); + + const events: ToolEvent[] = []; + for await (const event of consumer) { + events.push(event); + } + + expect(events).toHaveLength(2); + expect(events[0]).toEqual({ type: 'delta', content: 'test' }); + expect(events[1]).toEqual({ + type: 'preliminary_result', + toolCallId: 'call_123', + result: { progress: 50 }, + }); + }); + }); +});