Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ export {
} from './lib/stream-transformers.js';
// Tool creation helpers
export { tool } from './lib/tool.js';
// Real-time tool event broadcasting
export { ToolEventBroadcaster } from './lib/tool-event-broadcaster.js';
export {
hasApprovalRequiredTools,
hasExecuteFunction,
Expand Down
216 changes: 180 additions & 36 deletions src/lib/model-result.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import type {
TurnContext,
UnsentToolResult,
} from './tool-types.js';
import { ToolEventBroadcaster } from './tool-event-broadcaster.js';

import { betaResponsesSend } from '../funcs/betaResponsesSend.js';
import {
Expand Down Expand Up @@ -134,7 +135,11 @@ export class ModelResult<TTools extends readonly Tool[]> {
private initPromise: Promise<void> | null = null;
private toolExecutionPromise: Promise<void> | null = null;
private finalResponse: models.OpenResponsesNonStreamingResponse | null = null;
private preliminaryResults: Map<string, unknown[]> = new Map();
private toolEventBroadcaster: ToolEventBroadcaster<{
type: 'preliminary_result';
toolCallId: string;
result: InferToolEventsUnion<TTools>;
}> | null = null;
private allToolExecutionRounds: Array<{
round: number;
toolCalls: ParsedToolCall<Tool>[];
Expand Down Expand Up @@ -402,23 +407,35 @@ export class ModelResult<TTools extends readonly Tool[]> {
/**
* Execute all tools in a single round
* Returns the tool results for API submission
* @param broadcaster - Optional broadcaster for real-time preliminary result streaming
*/
private async executeToolRound(
toolCalls: ParsedToolCall<Tool>[],
turnContext: TurnContext
turnContext: TurnContext,
broadcaster?: ToolEventBroadcaster<{
type: 'preliminary_result';
toolCallId: string;
result: InferToolEventsUnion<TTools>;
}>
): Promise<models.OpenResponsesFunctionCallOutput[]> {
const toolResults: models.OpenResponsesFunctionCallOutput[] = [];

for (const toolCall of toolCalls) {
const tool = this.options.tools?.find((t) => t.function.name === toolCall.name);
if (!tool || !hasExecuteFunction(tool)) continue;

const result = await executeTool(tool, toolCall, turnContext);
// Create callback for real-time preliminary results
const onPreliminaryResult = broadcaster
? (callId: string, result: unknown) => {
broadcaster.push({
type: 'preliminary_result' as const,
toolCallId: callId,
result: result as InferToolEventsUnion<TTools>,
});
}
: undefined;

// Store preliminary results for streaming
if (result.preliminaryResults && result.preliminaryResults.length > 0) {
this.preliminaryResults.set(toolCall.id, result.preliminaryResults);
}
const result = await executeTool(tool, toolCall, turnContext, onPreliminaryResult);

toolResults.push({
type: 'function_call_output' as const,
Expand Down Expand Up @@ -914,6 +931,127 @@ export class ModelResult<TTools extends readonly Tool[]> {
return this.toolExecutionPromise;
}

/**
* Execute tools with real-time broadcasting of preliminary results.
* This is used by streaming methods that want real-time tool events.
* Unlike executeToolsIfNeeded, this creates a new broadcaster and passes it through.
*/
private async executeToolsWithBroadcast(
broadcaster: ToolEventBroadcaster<{
type: 'preliminary_result';
toolCallId: string;
result: InferToolEventsUnion<TTools>;
}>
): Promise<void> {
try {
await this.initStream();

// If resuming from approval and still pending, don't continue
if (this.isResumingFromApproval && this.currentState?.status === 'awaiting_approval') {
return;
}

// Get initial response
let currentResponse = await this.getInitialResponse();

// Save initial response to state
await this.saveResponseToState(currentResponse);

// Check if tools should be executed
const hasToolCalls = currentResponse.output.some(
(item) => hasTypeProperty(item) && item.type === 'function_call'
);

if (!this.options.tools?.length || !hasToolCalls) {
this.finalResponse = currentResponse;
await this.markStateComplete();
return;
}

// Extract and check tool calls
const toolCalls = extractToolCallsFromResponse(currentResponse);

// Check for approval requirements
if (await this.handleApprovalCheck(toolCalls, 0, currentResponse)) {
return; // Paused for approval
}

if (!this.hasExecutableToolCalls(toolCalls)) {
this.finalResponse = currentResponse;
await this.markStateComplete();
return;
}

// Main execution loop
let currentRound = 0;

while (true) {
// Check for external interruption
if (await this.checkForInterruption(currentResponse)) {
return;
}

// Check stop conditions
if (await this.shouldStopExecution()) {
break;
}

const currentToolCalls = extractToolCallsFromResponse(currentResponse);
if (currentToolCalls.length === 0) {
break;
}

// Check for approval requirements
if (await this.handleApprovalCheck(currentToolCalls, currentRound + 1, currentResponse)) {
return;
}

if (!this.hasExecutableToolCalls(currentToolCalls)) {
break;
}

// Build turn context
const turnContext: TurnContext = { numberOfTurns: currentRound + 1 };

// Resolve async functions for this turn
await this.resolveAsyncFunctionsForTurn(turnContext);

// Execute tools WITH broadcaster for real-time events
const toolResults = await this.executeToolRound(currentToolCalls, turnContext, broadcaster);

// Track execution round
this.allToolExecutionRounds.push({
round: currentRound,
toolCalls: currentToolCalls,
response: currentResponse,
toolResults,
});

// Save tool results to state
await this.saveToolResultsToState(toolResults);

// Apply nextTurnParams
await this.applyNextTurnParams(currentToolCalls);

// Make follow-up request
currentResponse = await this.makeFollowupRequest(currentResponse, toolResults);

// Save new response to state
await this.saveResponseToState(currentResponse);

currentRound++;
}

// Validate and finalize
this.validateFinalResponse(currentResponse);
this.finalResponse = currentResponse;
await this.markStateComplete();
} finally {
// Always complete the broadcaster when done
broadcaster.complete();
}
}

/**
* Internal helper to get the text after tool execution
*/
Expand Down Expand Up @@ -958,7 +1096,7 @@ export class ModelResult<TTools extends readonly Tool[]> {
/**
* Stream all response events as they arrive.
* Multiple consumers can iterate over this stream concurrently.
* Includes preliminary tool result events after tool execution.
* Preliminary tool results are streamed in REAL-TIME as generator tools yield.
*/
getFullResponsesStream(): AsyncIterableIterator<ResponseStreamEvent<InferToolEventsUnion<TTools>>> {
return async function* (this: ModelResult<TTools>) {
Expand All @@ -967,27 +1105,32 @@ export class ModelResult<TTools extends readonly Tool[]> {
throw new Error('Stream not initialized');
}

// Create broadcaster for real-time tool events
this.toolEventBroadcaster = new ToolEventBroadcaster();
const toolEventConsumer = this.toolEventBroadcaster.createConsumer();

// Start tool execution in background (doesn't block)
const executionPromise = this.executeToolsWithBroadcast(this.toolEventBroadcaster);

const consumer = this.reusableStream.createConsumer();

// Yield original events directly
// Yield original API events
for await (const event of consumer) {
yield event;
}

// After stream completes, check if tools were executed and emit preliminary results
await this.executeToolsIfNeeded();

// Emit all preliminary results as new event types
for (const [toolCallId, results] of this.preliminaryResults) {
for (const result of results) {
yield {
type: 'tool.preliminary_result' as const,
toolCallId,
result: result as InferToolEventsUnion<TTools>,
timestamp: Date.now(),
};
}
// Yield tool preliminary results as they arrive (real-time!)
for await (const event of toolEventConsumer) {
yield {
type: 'tool.preliminary_result' as const,
toolCallId: event.toolCallId,
result: event.result,
timestamp: Date.now(),
};
}

// Ensure execution completed (handles errors)
await executionPromise;
}.call(this);
}

Expand Down Expand Up @@ -1065,7 +1208,7 @@ export class ModelResult<TTools extends readonly Tool[]> {

/**
* Stream tool call argument deltas and preliminary results.
* This filters the full event stream to yield:
* Preliminary results are streamed in REAL-TIME as generator tools yield.
* - Tool call argument deltas as { type: "delta", content: string }
* - Preliminary results as { type: "preliminary_result", toolCallId, result }
*/
Expand All @@ -1076,27 +1219,28 @@ export class ModelResult<TTools extends readonly Tool[]> {
throw new Error('Stream not initialized');
}

// Yield tool deltas as structured events
// Create broadcaster for real-time tool events
this.toolEventBroadcaster = new ToolEventBroadcaster();
const toolEventConsumer = this.toolEventBroadcaster.createConsumer();

// Start tool execution in background (doesn't block)
const executionPromise = this.executeToolsWithBroadcast(this.toolEventBroadcaster);

// Yield tool deltas from API stream
for await (const delta of extractToolDeltas(this.reusableStream)) {
yield {
type: 'delta' as const,
content: delta,
};
}

// After stream completes, check if tools were executed and emit preliminary results
await this.executeToolsIfNeeded();

// Emit all preliminary results
for (const [toolCallId, results] of this.preliminaryResults) {
for (const result of results) {
yield {
type: 'preliminary_result' as const,
toolCallId,
result: result as InferToolEventsUnion<TTools>,
};
}
// Yield tool events as they arrive (real-time!)
for await (const event of toolEventConsumer) {
yield event;
}

// Ensure execution completed (handles errors)
await executionPromise;
}.call(this);
}

Expand Down
Loading