From 584f94940245292de2d68906a693cf791000e5c3 Mon Sep 17 00:00:00 2001 From: Daniel Garnier-Moiroux Date: Thu, 20 Nov 2025 15:50:46 +0100 Subject: [PATCH] Basic version negotiation capabilities - We do not support full version negotiation, but this commit allows the client to the send the correct version to match that of the server. - There is one limitation: the GET /mcp request happens too early, before we have deserialized the initialization response, so we do not know the negotiated version yet. This request will have the client latest version as Mcp-Protocol-Version value. Signed-off-by: Daniel Garnier-Moiroux --- .../client/LifecycleInitializer.java | 6 +++++- .../client/McpAsyncClient.java | 2 ++ .../HttpClientStreamableHttpTransport.java | 13 ++++++++++--- .../spec/McpTransportSession.java | 4 ++-- ...eHttpVersionNegotiationIntegrationTests.java | 16 +++++++++------- .../McpTestRequestRecordingServletFilter.java | 4 ++-- .../WebClientStreamableHttpTransport.java | 13 +++++++++---- ...eHttpVersionNegotiationIntegrationTests.java | 17 +++++++++++------ ...tRequestRecordingExchangeFilterFunction.java | 5 +++-- 9 files changed, 53 insertions(+), 27 deletions(-) diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/client/LifecycleInitializer.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/LifecycleInitializer.java index f56c79a6d..07d86f40e 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/client/LifecycleInitializer.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/LifecycleInitializer.java @@ -287,7 +287,9 @@ public Mono withInitialization(String actionName, Function operation.apply(res) + .contextWrite(c -> c.put(McpAsyncClient.NEGOTIATED_PROTOCOL_VERSION, + res.initializeResult().protocolVersion()))); }); } @@ -319,6 +321,8 @@ private Mono doInitialize(DefaultInitialization init } return mcpClientSession.sendNotification(McpSchema.METHOD_NOTIFICATION_INITIALIZED, null) + .contextWrite( + c -> c.put(McpAsyncClient.NEGOTIATED_PROTOCOL_VERSION, initializeResult.protocolVersion())) .thenReturn(initializeResult); }).flatMap(initializeResult -> { initialization.cacheResult(initializeResult); diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java index 2d1f4b43c..e6a09cd08 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java @@ -106,6 +106,8 @@ public class McpAsyncClient { public static final TypeRef PROGRESS_NOTIFICATION_TYPE_REF = new TypeRef<>() { }; + public static final String NEGOTIATED_PROTOCOL_VERSION = "io.modelcontextprotocol.client.negotiated-protocol-version"; + /** * Client capabilities. */ diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java index c48aedbcf..e41f45ebb 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java @@ -20,6 +20,7 @@ import java.util.function.Consumer; import java.util.function.Function; +import io.modelcontextprotocol.client.McpAsyncClient; import io.modelcontextprotocol.client.transport.ResponseSubscribers.ResponseEvent; import io.modelcontextprotocol.client.transport.customizer.McpAsyncHttpClientRequestCustomizer; import io.modelcontextprotocol.client.transport.customizer.McpSyncHttpClientRequestCustomizer; @@ -193,7 +194,9 @@ private Publisher createDelete(String sessionId) { .uri(uri) .header("Cache-Control", "no-cache") .header(HttpHeaders.MCP_SESSION_ID, sessionId) - .header(HttpHeaders.PROTOCOL_VERSION, this.latestSupportedProtocolVersion) + .header(HttpHeaders.PROTOCOL_VERSION, + ctx.getOrDefault(McpAsyncClient.NEGOTIATED_PROTOCOL_VERSION, + this.latestSupportedProtocolVersion)) .DELETE(); var transportContext = ctx.getOrDefault(McpTransportContext.KEY, McpTransportContext.EMPTY); return Mono.from(this.httpRequestCustomizer.customize(builder, "DELETE", uri, null, transportContext)); @@ -264,7 +267,9 @@ private Mono reconnect(McpTransportStream stream) { var builder = requestBuilder.uri(uri) .header(HttpHeaders.ACCEPT, TEXT_EVENT_STREAM) .header("Cache-Control", "no-cache") - .header(HttpHeaders.PROTOCOL_VERSION, this.latestSupportedProtocolVersion) + .header(HttpHeaders.PROTOCOL_VERSION, + connectionCtx.getOrDefault(McpAsyncClient.NEGOTIATED_PROTOCOL_VERSION, + this.latestSupportedProtocolVersion)) .GET(); var transportContext = connectionCtx.getOrDefault(McpTransportContext.KEY, McpTransportContext.EMPTY); return Mono.from(this.httpRequestCustomizer.customize(builder, "GET", uri, null, transportContext)); @@ -439,7 +444,9 @@ public Mono sendMessage(McpSchema.JSONRPCMessage sentMessage) { .header(HttpHeaders.ACCEPT, APPLICATION_JSON + ", " + TEXT_EVENT_STREAM) .header(HttpHeaders.CONTENT_TYPE, APPLICATION_JSON) .header(HttpHeaders.CACHE_CONTROL, "no-cache") - .header(HttpHeaders.PROTOCOL_VERSION, this.latestSupportedProtocolVersion) + .header(HttpHeaders.PROTOCOL_VERSION, + ctx.getOrDefault(McpAsyncClient.NEGOTIATED_PROTOCOL_VERSION, + this.latestSupportedProtocolVersion)) .POST(HttpRequest.BodyPublishers.ofString(jsonBody)); var transportContext = ctx.getOrDefault(McpTransportContext.KEY, McpTransportContext.EMPTY); return Mono diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpTransportSession.java b/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpTransportSession.java index 716ff0d16..68f0fc5bb 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpTransportSession.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpTransportSession.java @@ -4,10 +4,10 @@ package io.modelcontextprotocol.spec; -import org.reactivestreams.Publisher; - import java.util.Optional; +import org.reactivestreams.Publisher; + /** * An abstraction of the session as perceived from the MCP transport layer. Not to be * confused with the {@link McpSession} type that operates at the level of the JSON-RPC diff --git a/mcp-core/src/test/java/io/modelcontextprotocol/common/HttpClientStreamableHttpVersionNegotiationIntegrationTests.java b/mcp-core/src/test/java/io/modelcontextprotocol/common/HttpClientStreamableHttpVersionNegotiationIntegrationTests.java index 12a3ef9c6..8efb6a960 100644 --- a/mcp-core/src/test/java/io/modelcontextprotocol/common/HttpClientStreamableHttpVersionNegotiationIntegrationTests.java +++ b/mcp-core/src/test/java/io/modelcontextprotocol/common/HttpClientStreamableHttpVersionNegotiationIntegrationTests.java @@ -89,7 +89,7 @@ void usesLatestVersion() { } @Test - void usesCustomLatestVersion() { + void usesServerSupportedVersion() { startTomcat(); var transport = HttpClientStreamableHttpTransport.builder("http://localhost:" + PORT) @@ -101,19 +101,21 @@ void usesCustomLatestVersion() { McpSchema.CallToolResult response = client.callTool(new McpSchema.CallToolRequest("test-tool", Map.of())); var calls = requestRecordingFilter.getCalls(); - - assertThat(calls).filteredOn(c -> !c.body().contains("\"method\":\"initialize\"")) - // GET /mcp ; POST notification/initialized ; POST tools/call - .hasSize(3) + // Initialize tells the server the Client's latest supported version + // FIXME: Set the correct protocol version on GET /mcp + assertThat(calls).filteredOn(c -> c.method().equals("POST") && !c.body().contains("\"method\":\"initialize\"")) + // POST notification/initialized ; POST tools/call + .hasSize(2) .map(McpTestRequestRecordingServletFilter.Call::headers) - .allSatisfy(headers -> assertThat(headers).containsEntry("mcp-protocol-version", "2263-03-18")); + .allSatisfy(headers -> assertThat(headers).containsEntry("mcp-protocol-version", + ProtocolVersions.MCP_2025_06_18)); assertThat(response).isNotNull(); assertThat(response.content()).hasSize(1) .first() .extracting(McpSchema.TextContent.class::cast) .extracting(McpSchema.TextContent::text) - .isEqualTo("2263-03-18"); + .isEqualTo(ProtocolVersions.MCP_2025_06_18); mcpServer.close(); } diff --git a/mcp-core/src/test/java/io/modelcontextprotocol/server/transport/McpTestRequestRecordingServletFilter.java b/mcp-core/src/test/java/io/modelcontextprotocol/server/transport/McpTestRequestRecordingServletFilter.java index 09f0d305d..b94552d12 100644 --- a/mcp-core/src/test/java/io/modelcontextprotocol/server/transport/McpTestRequestRecordingServletFilter.java +++ b/mcp-core/src/test/java/io/modelcontextprotocol/server/transport/McpTestRequestRecordingServletFilter.java @@ -46,7 +46,7 @@ public void doFilter(ServletRequest servletRequest, ServletResponse servletRespo .collect(Collectors.toUnmodifiableMap(Function.identity(), name -> String.join(",", Collections.list(req.getHeaders(name))))); var request = new CachedBodyHttpServletRequest(req); - calls.add(new Call(headers, request.getBodyAsString())); + calls.add(new Call(req.getMethod(), headers, request.getBodyAsString())); filterChain.doFilter(request, servletResponse); } else { @@ -60,7 +60,7 @@ public List getCalls() { return List.copyOf(calls); } - public record Call(Map headers, String body) { + public record Call(String method, Map headers, String body) { } diff --git a/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java b/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java index 6b1d6ba8a..f0d3ad839 100644 --- a/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java +++ b/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java @@ -24,6 +24,7 @@ import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClientResponseException; +import io.modelcontextprotocol.client.McpAsyncClient; import io.modelcontextprotocol.json.McpJsonMapper; import io.modelcontextprotocol.json.TypeRef; import io.modelcontextprotocol.spec.ClosedMcpTransportSession; @@ -225,7 +226,9 @@ private Mono reconnect(McpTransportStream stream) { Disposable connection = webClient.get() .uri(this.endpoint) .accept(MediaType.TEXT_EVENT_STREAM) - .header(HttpHeaders.PROTOCOL_VERSION, this.latestSupportedProtocolVersion) + .header(HttpHeaders.PROTOCOL_VERSION, + ctx.getOrDefault(McpAsyncClient.NEGOTIATED_PROTOCOL_VERSION, + this.latestSupportedProtocolVersion)) .headers(httpHeaders -> { transportSession.sessionId().ifPresent(id -> httpHeaders.add(HttpHeaders.MCP_SESSION_ID, id)); if (stream != null) { @@ -288,10 +291,12 @@ public Mono sendMessage(McpSchema.JSONRPCMessage message) { final AtomicReference disposableRef = new AtomicReference<>(); final McpTransportSession transportSession = this.activeSession.get(); - Disposable connection = webClient.post() + Disposable connection = Flux.deferContextual(ctx -> webClient.post() .uri(this.endpoint) .accept(MediaType.APPLICATION_JSON, MediaType.TEXT_EVENT_STREAM) - .header(HttpHeaders.PROTOCOL_VERSION, this.latestSupportedProtocolVersion) + .header(HttpHeaders.PROTOCOL_VERSION, + ctx.getOrDefault(McpAsyncClient.NEGOTIATED_PROTOCOL_VERSION, + this.latestSupportedProtocolVersion)) .headers(httpHeaders -> { transportSession.sessionId().ifPresent(id -> httpHeaders.add(HttpHeaders.MCP_SESSION_ID, id)); }) @@ -350,7 +355,7 @@ else if (mediaType.isCompatibleWith(MediaType.APPLICATION_JSON)) { } return this.extractError(response, sessionRepresentation); } - }) + })) .flatMap(jsonRpcMessage -> this.handler.get().apply(Mono.just(jsonRpcMessage))) .onErrorComplete(t -> { // handle the error first diff --git a/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/WebFluxStreamableHttpVersionNegotiationIntegrationTests.java b/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/WebFluxStreamableHttpVersionNegotiationIntegrationTests.java index 7627bd419..5d2bfda68 100644 --- a/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/WebFluxStreamableHttpVersionNegotiationIntegrationTests.java +++ b/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/WebFluxStreamableHttpVersionNegotiationIntegrationTests.java @@ -27,6 +27,7 @@ import reactor.netty.DisposableServer; import reactor.netty.http.server.HttpServer; +import org.springframework.http.HttpMethod; import org.springframework.http.server.reactive.HttpHandler; import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter; import org.springframework.web.reactive.function.client.WebClient; @@ -116,7 +117,7 @@ void usesLatestVersion() { } @Test - void usesCustomLatestVersion() { + void usesServerSupportedVersion() { var transport = WebClientStreamableHttpTransport .builder(WebClient.builder().baseUrl("http://localhost:" + PORT)) .supportedProtocolVersions(List.of(ProtocolVersions.MCP_2025_06_18, "2263-03-18")) @@ -128,18 +129,22 @@ void usesCustomLatestVersion() { McpSchema.CallToolResult response = client.callTool(new McpSchema.CallToolRequest("test-tool", Map.of())); var calls = recordingFilterFunction.getCalls(); - assertThat(calls).filteredOn(c -> !c.body().contains("\"method\":\"initialize\"")) - // GET /mcp ; POST notification/initialized ; POST tools/call - .hasSize(3) + // Initialize tells the server the Client's latest supported version + // FIXME: Set the correct protocol version on GET /mcp + assertThat(calls) + .filteredOn(c -> !c.body().contains("\"method\":\"initialize\"") && c.method().equals(HttpMethod.POST)) + // POST notification/initialized ; POST tools/call + .hasSize(2) .map(McpTestRequestRecordingExchangeFilterFunction.Call::headers) - .allSatisfy(headers -> assertThat(headers).containsEntry("mcp-protocol-version", "2263-03-18")); + .allSatisfy(headers -> assertThat(headers).containsEntry("mcp-protocol-version", + ProtocolVersions.MCP_2025_06_18)); assertThat(response).isNotNull(); assertThat(response.content()).hasSize(1) .first() .extracting(McpSchema.TextContent.class::cast) .extracting(McpSchema.TextContent::text) - .isEqualTo("2263-03-18"); + .isEqualTo(ProtocolVersions.MCP_2025_06_18); mcpServer.close(); } diff --git a/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/utils/McpTestRequestRecordingExchangeFilterFunction.java b/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/utils/McpTestRequestRecordingExchangeFilterFunction.java index 5600795c1..55129d481 100644 --- a/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/utils/McpTestRequestRecordingExchangeFilterFunction.java +++ b/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/utils/McpTestRequestRecordingExchangeFilterFunction.java @@ -11,6 +11,7 @@ import reactor.core.publisher.Mono; +import org.springframework.http.HttpMethod; import org.springframework.web.reactive.function.server.HandlerFilterFunction; import org.springframework.web.reactive.function.server.HandlerFunction; import org.springframework.web.reactive.function.server.ServerRequest; @@ -34,7 +35,7 @@ public Mono filter(ServerRequest request, HandlerFunction next) .collect(Collectors.toMap(String::toLowerCase, k -> String.join(",", request.headers().header(k)))); var cr = request.bodyToMono(String.class).defaultIfEmpty("").map(body -> { - this.calls.add(new Call(headers, body)); + this.calls.add(new Call(request.method(), headers, body)); return ServerRequest.from(request).body(body).build(); }); @@ -46,7 +47,7 @@ public List getCalls() { return List.copyOf(calls); } - public record Call(Map headers, String body) { + public record Call(HttpMethod method, Map headers, String body) { }