Skip to content

Conversation

@mattapperson
Copy link
Collaborator

Summary

This PR enables real-time streaming of generator tool preliminary results - yields are streamed AS they happen during execution, instead of being batched after tool execution completes.

Key changes:

  • ToolEventBroadcaster: New push-based multi-consumer event broadcaster following the ReusableReadableStream pattern
  • Real-time streaming: getToolStream() and getFullResponsesStream() now yield preliminary results as generator tools emit them
  • Bug fix: Fixed toJSONSchema target from invalid 'openapi-3.0' to 'draft-7' (Zod v4 compatibility)

How it works

  1. When a generator tool executes, each yield is immediately pushed to ToolEventBroadcaster
  2. Consumers created via createConsumer() receive events in real-time as they're pushed
  3. Multiple consumers can be created and each gets all events from position 0
  4. The broadcaster buffers events for late-joining consumers

Files changed

File Changes
src/lib/tool-event-broadcaster.ts NEW - Push-based multi-consumer broadcaster
src/lib/model-result.ts Wire broadcaster into tool execution and streaming methods
src/lib/tool-executor.ts Fix toJSONSchema target
src/index.ts Export ToolEventBroadcaster
tests/unit/tool-event-broadcaster.test.ts NEW - 11 unit tests
tests/e2e/call-model-tools.test.ts Fix test toJSONSchema targets

Test plan

  • Unit tests for ToolEventBroadcaster (11 tests)
  • E2E tests for tool execution (23 tests)
  • Build passes
  • Lint passes

This change enables generator tool yields to be streamed to consumers AS
they happen during execution, instead of being batched after completion.

Changes:
- Add ToolEventBroadcaster for push-based multi-consumer event streaming
- Wire onPreliminaryResult callback in executeToolRound to broadcast events
- Update getToolStream() and getFullResponsesStream() to consume broadcast
- Fix Zod v4 toJSONSchema target to 'draft-7' (openapi-3.0 was invalid)
- Add comprehensive unit tests for ToolEventBroadcaster
- Fix test files to use draft-7 target
- Add buffer cleanup after all consumers complete (memory leak fix)
- Use lazy initialization for broadcaster (race condition fix)
- Add try-catch in onPreliminaryResult callback (error handling)
- Add tests for completion between consumer iterations
Comment on lines +350 to +358
try {
this.toolEventBroadcaster!.push({
type: 'preliminary_result' as const,
toolCallId: callId,
result: resultValue as InferToolEventsUnion<TTools>,
});
} catch {
// Don't crash tool execution if broadcasting fails
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this throws?... seemed pretty weird to. If anything I'd expect the push operator to gracefully handle any error inside

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants