-
Notifications
You must be signed in to change notification settings - Fork 323
mcp: add DisableListening option to StreamableClientTransport #729
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1389,6 +1389,20 @@ type StreamableClientTransport struct { | |
| // It defaults to 5. To disable retries, use a negative number. | ||
| MaxRetries int | ||
|
|
||
| // DisableListening disables receiving server-to-client notifications when no request is in flight. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we just call this "DisableStandaloneSSE" to be clear? What do other SDKs call it, if they have such a setting? |
||
| // | ||
| // By default, the client establishes a standalone long-live GET HTTP connection to the server | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. long-lived |
||
| // to receive server-initiated messages (like ToolListChangedNotification). | ||
| // https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#listening-for-messages-from-the-server | ||
| // NOTICE: Even if continuous listening is enabled, the server may not support this feature. | ||
| // | ||
| // If false (default), the client will establish the standalone SSE stream. | ||
| // If true, the client will not establish the standalone SSE stream and will only receive | ||
| // responses to its own requests. | ||
| // | ||
| // Defaults to false to maintain backward compatibility with existing behavior. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's omit this last sentence: it's documenting the change (adding a new field), not the steady state. It defaults to false because we thought the SSE stream should be established by default (which IIRC is consistent with other SDKs). I don't think we need to caveat it. |
||
| DisableListening bool | ||
|
|
||
| // TODO(rfindley): propose exporting these. | ||
| // If strict is set, the transport is in 'strict mode', where any violation | ||
| // of the MCP spec causes a failure. | ||
|
|
@@ -1416,6 +1430,28 @@ var ( | |
| reconnectInitialDelay = 1 * time.Second | ||
| ) | ||
|
|
||
| // WithDisableListening disables receiving server-to-client notifications when no request is in flight. | ||
| // | ||
| // By default, the client establishes a standalone long-live GET HTTP connection to the server | ||
| // to receive server-initiated messages. This function disables that behavior. | ||
| // | ||
| // If you want to disable continuous listening, you can either: | ||
| // | ||
| // transport := &mcp.StreamableClientTransport{ | ||
| // Endpoint: "http://localhost:8080/mcp", | ||
| // DisableListening: true, | ||
| // } | ||
| // | ||
| // Or use this convenience function: | ||
| // | ||
| // transport := &mcp.StreamableClientTransport{ | ||
| // Endpoint: "http://localhost:8080/mcp", | ||
| // } | ||
| // mcp.WithDisableListening(transport) | ||
| func WithDisableListening(transport *StreamableClientTransport) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why would we have this function when there's already a field on the StreamableClientTransport? |
||
| transport.DisableListening = true | ||
| } | ||
|
|
||
| // Connect implements the [Transport] interface. | ||
| // | ||
| // The resulting [Connection] writes messages via POST requests to the | ||
|
|
@@ -1453,16 +1489,17 @@ func (t *StreamableClientTransport) Connect(ctx context.Context) (Connection, er | |
| // middleware), yet only cancel the standalone stream when the connection is closed. | ||
| connCtx, cancel := context.WithCancel(xcontext.Detach(ctx)) | ||
| conn := &streamableClientConn{ | ||
| url: t.Endpoint, | ||
| client: client, | ||
| incoming: make(chan jsonrpc.Message, 10), | ||
| done: make(chan struct{}), | ||
| maxRetries: maxRetries, | ||
| strict: t.strict, | ||
| logger: ensureLogger(t.logger), // must be non-nil for safe logging | ||
| ctx: connCtx, | ||
| cancel: cancel, | ||
| failed: make(chan struct{}), | ||
| url: t.Endpoint, | ||
| client: client, | ||
| incoming: make(chan jsonrpc.Message, 10), | ||
| done: make(chan struct{}), | ||
| maxRetries: maxRetries, | ||
| strict: t.strict, | ||
| logger: ensureLogger(t.logger), // must be non-nil for safe logging | ||
| ctx: connCtx, | ||
| cancel: cancel, | ||
| failed: make(chan struct{}), | ||
| disableListening: t.DisableListening, | ||
| } | ||
| return conn, nil | ||
| } | ||
|
|
@@ -1477,6 +1514,10 @@ type streamableClientConn struct { | |
| strict bool // from [StreamableClientTransport.strict] | ||
| logger *slog.Logger // from [StreamableClientTransport.logger] | ||
|
|
||
| // disableListening controls whether to disable the standalone SSE stream | ||
| // for receiving server-to-client notifications when no request is in flight. | ||
| disableListening bool // from [StreamableClientTransport.DisableListening] | ||
|
|
||
| // Guard calls to Close, as it may be called multiple times. | ||
| closeOnce sync.Once | ||
| closeErr error | ||
|
|
@@ -1518,7 +1559,7 @@ func (c *streamableClientConn) sessionUpdated(state clientSessionState) { | |
| c.mu.Unlock() | ||
|
|
||
| // Start the standalone SSE stream as soon as we have the initialized | ||
| // result. | ||
| // result, if continuous listening is enabled. | ||
| // | ||
| // § 2.2: The client MAY issue an HTTP GET to the MCP endpoint. This can be | ||
| // used to open an SSE stream, allowing the server to communicate to the | ||
|
|
@@ -1528,9 +1569,11 @@ func (c *streamableClientConn) sessionUpdated(state clientSessionState) { | |
| // initialized, we don't know whether the server requires a sessionID. | ||
| // | ||
| // § 2.5: A server using the Streamable HTTP transport MAY assign a session | ||
| // ID at initialization time, by including it in an Mcp-Session-Id header | ||
| // ID at initialization time, by including it in a Mcp-Session-Id header | ||
| // on the HTTP response containing the InitializeResult. | ||
| c.connectStandaloneSSE() | ||
| if !c.disableListening { | ||
| c.connectStandaloneSSE() | ||
| } | ||
| } | ||
|
|
||
| func (c *streamableClientConn) connectStandaloneSSE() { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"disables the standalone GET request that receives server-initiated messages and notifications."