From 772c7952b73516f2a241c465b038a84a616dc554 Mon Sep 17 00:00:00 2001 From: "clark.cao" Date: Tue, 16 Sep 2025 13:28:34 +0800 Subject: [PATCH 1/3] [fix] when mcp server not give mcp session id ,mcp session store null as session id --- .../WebClientStreamableHttpTransport.java | 41 +++++-------- .../HttpClientStreamableHttpTransport.java | 61 +++++++++---------- 2 files changed, 43 insertions(+), 59 deletions(-) diff --git a/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java b/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java index 154eb4703..7fa359dc2 100644 --- a/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java +++ b/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java @@ -4,13 +4,11 @@ package io.modelcontextprotocol.client.transport; -import java.io.IOException; -import java.util.List; -import java.util.Optional; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Consumer; -import java.util.function.Function; - +import io.modelcontextprotocol.json.McpJsonMapper; +import io.modelcontextprotocol.json.TypeRef; +import io.modelcontextprotocol.spec.*; +import io.modelcontextprotocol.util.Assert; +import io.modelcontextprotocol.util.Utils; import org.reactivestreams.Publisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -18,32 +16,23 @@ import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import org.springframework.http.codec.ServerSentEvent; +import org.springframework.util.StringUtils; import org.springframework.web.reactive.function.client.ClientResponse; import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClientResponseException; - -import io.modelcontextprotocol.json.TypeRef; -import io.modelcontextprotocol.json.McpJsonMapper; - -import io.modelcontextprotocol.spec.DefaultMcpTransportSession; -import io.modelcontextprotocol.spec.DefaultMcpTransportStream; -import io.modelcontextprotocol.spec.HttpHeaders; -import io.modelcontextprotocol.spec.McpClientTransport; -import io.modelcontextprotocol.spec.McpError; -import io.modelcontextprotocol.spec.McpSchema; -import io.modelcontextprotocol.spec.McpTransportException; -import io.modelcontextprotocol.spec.McpTransportSession; -import io.modelcontextprotocol.spec.McpTransportSessionNotFoundException; -import io.modelcontextprotocol.spec.McpTransportStream; -import io.modelcontextprotocol.spec.ProtocolVersions; -import io.modelcontextprotocol.util.Assert; -import io.modelcontextprotocol.util.Utils; import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.util.function.Tuple2; import reactor.util.function.Tuples; +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import java.util.function.Function; + /** * An implementation of the Streamable HTTP protocol as defined by the * 2025-03-26 version of the MCP specification. @@ -279,8 +268,8 @@ public Mono sendMessage(McpSchema.JSONRPCMessage message) { }) .bodyValue(message) .exchangeToFlux(response -> { - if (transportSession - .markInitialized(response.headers().asHttpHeaders().getFirst(HttpHeaders.MCP_SESSION_ID))) { + String mcpSessionId = response.headers().asHttpHeaders().getFirst(HttpHeaders.MCP_SESSION_ID); + if (StringUtils.hasText(mcpSessionId) && transportSession.markInitialized(mcpSessionId)) { // Once we have a session, we try to open an async stream for // the server to send notifications and requests out-of-band. reconnect(null).contextWrite(sink.contextView()).subscribe(); diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java b/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java index c73515938..2cce1fb27 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java +++ b/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java @@ -4,6 +4,25 @@ package io.modelcontextprotocol.client.transport; +import io.modelcontextprotocol.client.transport.ResponseSubscribers.ResponseEvent; +import io.modelcontextprotocol.client.transport.customizer.McpAsyncHttpClientRequestCustomizer; +import io.modelcontextprotocol.client.transport.customizer.McpSyncHttpClientRequestCustomizer; +import io.modelcontextprotocol.common.McpTransportContext; +import io.modelcontextprotocol.json.McpJsonMapper; +import io.modelcontextprotocol.json.TypeRef; +import io.modelcontextprotocol.spec.*; +import io.modelcontextprotocol.util.Assert; +import io.modelcontextprotocol.util.Utils; +import org.reactivestreams.Publisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.Disposable; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; +import reactor.core.publisher.Mono; +import reactor.util.function.Tuple2; +import reactor.util.function.Tuples; + import java.io.IOException; import java.net.URI; import java.net.http.HttpClient; @@ -12,42 +31,13 @@ import java.net.http.HttpResponse.BodyHandler; import java.time.Duration; import java.util.List; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletionException; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Function; -import org.reactivestreams.Publisher; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import io.modelcontextprotocol.json.TypeRef; -import io.modelcontextprotocol.json.McpJsonMapper; - -import io.modelcontextprotocol.client.transport.customizer.McpAsyncHttpClientRequestCustomizer; -import io.modelcontextprotocol.client.transport.customizer.McpSyncHttpClientRequestCustomizer; -import io.modelcontextprotocol.client.transport.ResponseSubscribers.ResponseEvent; -import io.modelcontextprotocol.common.McpTransportContext; -import io.modelcontextprotocol.spec.DefaultMcpTransportSession; -import io.modelcontextprotocol.spec.DefaultMcpTransportStream; -import io.modelcontextprotocol.spec.HttpHeaders; -import io.modelcontextprotocol.spec.McpClientTransport; -import io.modelcontextprotocol.spec.McpSchema; -import io.modelcontextprotocol.spec.McpTransportException; -import io.modelcontextprotocol.spec.McpTransportSession; -import io.modelcontextprotocol.spec.McpTransportSessionNotFoundException; -import io.modelcontextprotocol.spec.McpTransportStream; -import io.modelcontextprotocol.spec.ProtocolVersions; -import io.modelcontextprotocol.util.Assert; -import io.modelcontextprotocol.util.Utils; -import reactor.core.Disposable; -import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxSink; -import reactor.core.publisher.Mono; -import reactor.util.function.Tuple2; -import reactor.util.function.Tuples; - /** * An implementation of the Streamable HTTP protocol as defined by the * 2025-03-26 version of the MCP specification. @@ -87,7 +77,9 @@ public class HttpClientStreamableHttpTransport implements McpClientTransport { */ private final HttpClient httpClient; - /** HTTP request builder for building requests to send messages to the server */ + /** + * HTTP request builder for building requests to send messages to the server + */ private final HttpRequest.Builder requestBuilder; /** @@ -442,8 +434,11 @@ public Mono sendMessage(McpSchema.JSONRPCMessage sentMessage) { })).onErrorMap(CompletionException.class, t -> t.getCause()).onErrorComplete().subscribe(); })).flatMap(responseEvent -> { - if (transportSession.markInitialized( - responseEvent.responseInfo().headers().firstValue("mcp-session-id").orElseGet(() -> null))) { + String mcpSessionId = responseEvent.responseInfo() + .headers() + .firstValue("mcp-session-id") + .orElseGet(() -> null); + if (Objects.nonNull(mcpSessionId) && transportSession.markInitialized(mcpSessionId)) { // Once we have a session, we try to open an async stream for // the server to send notifications and requests out-of-band. From eb31fed8fb5a77be0cbe2f02474ebedbcfeae3fe Mon Sep 17 00:00:00 2001 From: "clark.cao" Date: Wed, 17 Sep 2025 12:01:07 +0800 Subject: [PATCH 2/3] [fix] when mcp server not give mcp session id , --- .../WebClientStreamableHttpTransport.java | 286 +++++---- .../HttpClientStreamableHttpTransport.java | 605 +++++++++--------- 2 files changed, 456 insertions(+), 435 deletions(-) diff --git a/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java b/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java index 7fa359dc2..1ac5e3615 100644 --- a/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java +++ b/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java @@ -4,11 +4,13 @@ package io.modelcontextprotocol.client.transport; -import io.modelcontextprotocol.json.McpJsonMapper; -import io.modelcontextprotocol.json.TypeRef; -import io.modelcontextprotocol.spec.*; -import io.modelcontextprotocol.util.Assert; -import io.modelcontextprotocol.util.Utils; +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import java.util.function.Function; + import org.reactivestreams.Publisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -20,19 +22,29 @@ import org.springframework.web.reactive.function.client.ClientResponse; import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClientResponseException; + +import io.modelcontextprotocol.json.TypeRef; +import io.modelcontextprotocol.json.McpJsonMapper; + +import io.modelcontextprotocol.spec.DefaultMcpTransportSession; +import io.modelcontextprotocol.spec.DefaultMcpTransportStream; +import io.modelcontextprotocol.spec.HttpHeaders; +import io.modelcontextprotocol.spec.McpClientTransport; +import io.modelcontextprotocol.spec.McpError; +import io.modelcontextprotocol.spec.McpSchema; +import io.modelcontextprotocol.spec.McpTransportException; +import io.modelcontextprotocol.spec.McpTransportSession; +import io.modelcontextprotocol.spec.McpTransportSessionNotFoundException; +import io.modelcontextprotocol.spec.McpTransportStream; +import io.modelcontextprotocol.spec.ProtocolVersions; +import io.modelcontextprotocol.util.Assert; +import io.modelcontextprotocol.util.Utils; import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.util.function.Tuple2; import reactor.util.function.Tuples; -import java.io.IOException; -import java.util.List; -import java.util.Optional; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Consumer; -import java.util.function.Function; - /** * An implementation of the Streamable HTTP protocol as defined by the * 2025-03-26 version of the MCP specification. @@ -94,7 +106,7 @@ public class WebClientStreamableHttpTransport implements McpClientTransport { private final AtomicReference> exceptionHandler = new AtomicReference<>(); private WebClientStreamableHttpTransport(McpJsonMapper jsonMapper, WebClient.Builder webClientBuilder, - String endpoint, boolean resumableStreams, boolean openConnectionOnStartup) { + String endpoint, boolean resumableStreams, boolean openConnectionOnStartup) { this.jsonMapper = jsonMapper; this.webClient = webClientBuilder.build(); this.endpoint = endpoint; @@ -134,16 +146,16 @@ public Mono connect(Function, Mono> onClose = sessionId -> sessionId == null ? Mono.empty() : webClient.delete() - .uri(this.endpoint) - .header(HttpHeaders.MCP_SESSION_ID, sessionId) - .header(HttpHeaders.PROTOCOL_VERSION, MCP_PROTOCOL_VERSION) - .retrieve() - .toBodilessEntity() - .onErrorComplete(e -> { - logger.warn("Got error when closing transport", e); - return true; - }) - .then(); + .uri(this.endpoint) + .header(HttpHeaders.MCP_SESSION_ID, sessionId) + .header(HttpHeaders.PROTOCOL_VERSION, MCP_PROTOCOL_VERSION) + .retrieve() + .toBodilessEntity() + .onErrorComplete(e -> { + logger.warn("Got error when closing transport", e); + return true; + }) + .then(); return new DefaultMcpTransportSession(onClose); } @@ -194,52 +206,52 @@ private Mono reconnect(McpTransportStream stream) { final McpTransportSession transportSession = this.activeSession.get(); Disposable connection = webClient.get() - .uri(this.endpoint) - .accept(MediaType.TEXT_EVENT_STREAM) - .header(HttpHeaders.PROTOCOL_VERSION, MCP_PROTOCOL_VERSION) - .headers(httpHeaders -> { - transportSession.sessionId().ifPresent(id -> httpHeaders.add(HttpHeaders.MCP_SESSION_ID, id)); - if (stream != null) { - stream.lastId().ifPresent(id -> httpHeaders.add(HttpHeaders.LAST_EVENT_ID, id)); - } - }) - .exchangeToFlux(response -> { - if (isEventStream(response)) { - logger.debug("Established SSE stream via GET"); - return eventStream(stream, response); - } - else if (isNotAllowed(response)) { - logger.debug("The server does not support SSE streams, using request-response mode."); - return Flux.empty(); - } - else if (isNotFound(response)) { - if (transportSession.sessionId().isPresent()) { - String sessionIdRepresentation = sessionIdOrPlaceholder(transportSession); - return mcpSessionNotFoundError(sessionIdRepresentation); + .uri(this.endpoint) + .accept(MediaType.TEXT_EVENT_STREAM) + .header(HttpHeaders.PROTOCOL_VERSION, MCP_PROTOCOL_VERSION) + .headers(httpHeaders -> { + transportSession.sessionId().ifPresent(id -> httpHeaders.add(HttpHeaders.MCP_SESSION_ID, id)); + if (stream != null) { + stream.lastId().ifPresent(id -> httpHeaders.add(HttpHeaders.LAST_EVENT_ID, id)); + } + }) + .exchangeToFlux(response -> { + if (isEventStream(response)) { + logger.debug("Established SSE stream via GET"); + return eventStream(stream, response); + } + else if (isNotAllowed(response)) { + logger.debug("The server does not support SSE streams, using request-response mode."); + return Flux.empty(); + } + else if (isNotFound(response)) { + if (transportSession.sessionId().isPresent()) { + String sessionIdRepresentation = sessionIdOrPlaceholder(transportSession); + return mcpSessionNotFoundError(sessionIdRepresentation); + } + else { + return this.extractError(response, MISSING_SESSION_ID); + } } else { - return this.extractError(response, MISSING_SESSION_ID); + return response.createError().doOnError(e -> { + logger.info("Opening an SSE stream failed. This can be safely ignored.", e); + }).flux(); } - } - else { - return response.createError().doOnError(e -> { - logger.info("Opening an SSE stream failed. This can be safely ignored.", e); - }).flux(); - } - }) - .flatMap(jsonrpcMessage -> this.handler.get().apply(Mono.just(jsonrpcMessage))) - .onErrorComplete(t -> { - this.handleException(t); - return true; - }) - .doFinally(s -> { - Disposable ref = disposableRef.getAndSet(null); - if (ref != null) { - transportSession.removeConnection(ref); - } - }) - .contextWrite(ctx) - .subscribe(); + }) + .flatMap(jsonrpcMessage -> this.handler.get().apply(Mono.just(jsonrpcMessage))) + .onErrorComplete(t -> { + this.handleException(t); + return true; + }) + .doFinally(s -> { + Disposable ref = disposableRef.getAndSet(null); + if (ref != null) { + transportSession.removeConnection(ref); + } + }) + .contextWrite(ctx) + .subscribe(); disposableRef.set(connection); transportSession.addConnection(connection); @@ -260,83 +272,83 @@ public Mono sendMessage(McpSchema.JSONRPCMessage message) { final McpTransportSession transportSession = this.activeSession.get(); Disposable connection = webClient.post() - .uri(this.endpoint) - .accept(MediaType.APPLICATION_JSON, MediaType.TEXT_EVENT_STREAM) - .header(HttpHeaders.PROTOCOL_VERSION, MCP_PROTOCOL_VERSION) - .headers(httpHeaders -> { - transportSession.sessionId().ifPresent(id -> httpHeaders.add(HttpHeaders.MCP_SESSION_ID, id)); - }) - .bodyValue(message) - .exchangeToFlux(response -> { - String mcpSessionId = response.headers().asHttpHeaders().getFirst(HttpHeaders.MCP_SESSION_ID); - if (StringUtils.hasText(mcpSessionId) && transportSession.markInitialized(mcpSessionId)) { - // Once we have a session, we try to open an async stream for - // the server to send notifications and requests out-of-band. - reconnect(null).contextWrite(sink.contextView()).subscribe(); - } - - String sessionRepresentation = sessionIdOrPlaceholder(transportSession); - - // The spec mentions only ACCEPTED, but the existing SDKs can return - // 200 OK for notifications - if (response.statusCode().is2xxSuccessful()) { - Optional contentType = response.headers().contentType(); - // Existing SDKs consume notifications with no response body nor - // content type - if (contentType.isEmpty()) { - logger.trace("Message was successfully sent via POST for session {}", - sessionRepresentation); - // signal the caller that the message was successfully - // delivered - sink.success(); - // communicate to downstream there is no streamed data coming - return Flux.empty(); + .uri(this.endpoint) + .accept(MediaType.APPLICATION_JSON, MediaType.TEXT_EVENT_STREAM) + .header(HttpHeaders.PROTOCOL_VERSION, MCP_PROTOCOL_VERSION) + .headers(httpHeaders -> { + transportSession.sessionId().ifPresent(id -> httpHeaders.add(HttpHeaders.MCP_SESSION_ID, id)); + }) + .bodyValue(message) + .exchangeToFlux(response -> { + String mcpSessionId = response.headers().asHttpHeaders().getFirst(HttpHeaders.MCP_SESSION_ID); + if (StringUtils.hasText(mcpSessionId) && transportSession.markInitialized(mcpSessionId)) { + // Once we have a session, we try to open an async stream for + // the server to send notifications and requests out-of-band. + reconnect(null).contextWrite(sink.contextView()).subscribe(); } - else { - MediaType mediaType = contentType.get(); - if (mediaType.isCompatibleWith(MediaType.TEXT_EVENT_STREAM)) { - logger.debug("Established SSE stream via POST"); - // communicate to caller that the message was delivered - sink.success(); - // starting a stream - return newEventStream(response, sessionRepresentation); - } - else if (mediaType.isCompatibleWith(MediaType.APPLICATION_JSON)) { - logger.trace("Received response to POST for session {}", sessionRepresentation); - // communicate to caller the message was delivered + + String sessionRepresentation = sessionIdOrPlaceholder(transportSession); + + // The spec mentions only ACCEPTED, but the existing SDKs can return + // 200 OK for notifications + if (response.statusCode().is2xxSuccessful()) { + Optional contentType = response.headers().contentType(); + // Existing SDKs consume notifications with no response body nor + // content type + if (contentType.isEmpty()) { + logger.trace("Message was successfully sent via POST for session {}", + sessionRepresentation); + // signal the caller that the message was successfully + // delivered sink.success(); - return directResponseFlux(message, response); + // communicate to downstream there is no streamed data coming + return Flux.empty(); } else { - logger.warn("Unknown media type {} returned for POST in session {}", contentType, - sessionRepresentation); - return Flux.error(new RuntimeException("Unknown media type returned: " + contentType)); + MediaType mediaType = contentType.get(); + if (mediaType.isCompatibleWith(MediaType.TEXT_EVENT_STREAM)) { + logger.debug("Established SSE stream via POST"); + // communicate to caller that the message was delivered + sink.success(); + // starting a stream + return newEventStream(response, sessionRepresentation); + } + else if (mediaType.isCompatibleWith(MediaType.APPLICATION_JSON)) { + logger.trace("Received response to POST for session {}", sessionRepresentation); + // communicate to caller the message was delivered + sink.success(); + return directResponseFlux(message, response); + } + else { + logger.warn("Unknown media type {} returned for POST in session {}", contentType, + sessionRepresentation); + return Flux.error(new RuntimeException("Unknown media type returned: " + contentType)); + } } } - } - else { - if (isNotFound(response) && !sessionRepresentation.equals(MISSING_SESSION_ID)) { - return mcpSessionNotFoundError(sessionRepresentation); + else { + if (isNotFound(response) && !sessionRepresentation.equals(MISSING_SESSION_ID)) { + return mcpSessionNotFoundError(sessionRepresentation); + } + return this.extractError(response, sessionRepresentation); } - return this.extractError(response, sessionRepresentation); - } - }) - .flatMap(jsonRpcMessage -> this.handler.get().apply(Mono.just(jsonRpcMessage))) - .onErrorComplete(t -> { - // handle the error first - this.handleException(t); - // inform the caller of sendMessage - sink.error(t); - return true; - }) - .doFinally(s -> { - Disposable ref = disposableRef.getAndSet(null); - if (ref != null) { - transportSession.removeConnection(ref); - } - }) - .contextWrite(sink.contextView()) - .subscribe(); + }) + .flatMap(jsonRpcMessage -> this.handler.get().apply(Mono.just(jsonRpcMessage))) + .onErrorComplete(t -> { + // handle the error first + this.handleException(t); + // inform the caller of sendMessage + sink.error(t); + return true; + }) + .doFinally(s -> { + Disposable ref = disposableRef.getAndSet(null); + if (ref != null) { + transportSession.removeConnection(ref); + } + }) + .contextWrite(sink.contextView()) + .subscribe(); disposableRef.set(connection); transportSession.addConnection(connection); }); @@ -407,7 +419,7 @@ private static String sessionIdOrPlaceholder(McpTransportSession transportSes } private Flux directResponseFlux(McpSchema.JSONRPCMessage sentMessage, - ClientResponse response) { + ClientResponse response) { return response.bodyToMono(String.class).>handle((responseMessage, s) -> { try { if (sentMessage instanceof McpSchema.JSONRPCNotification && Utils.hasText(responseMessage)) { diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java b/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java index 2cce1fb27..b9e9b0069 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java +++ b/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java @@ -4,25 +4,6 @@ package io.modelcontextprotocol.client.transport; -import io.modelcontextprotocol.client.transport.ResponseSubscribers.ResponseEvent; -import io.modelcontextprotocol.client.transport.customizer.McpAsyncHttpClientRequestCustomizer; -import io.modelcontextprotocol.client.transport.customizer.McpSyncHttpClientRequestCustomizer; -import io.modelcontextprotocol.common.McpTransportContext; -import io.modelcontextprotocol.json.McpJsonMapper; -import io.modelcontextprotocol.json.TypeRef; -import io.modelcontextprotocol.spec.*; -import io.modelcontextprotocol.util.Assert; -import io.modelcontextprotocol.util.Utils; -import org.reactivestreams.Publisher; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import reactor.core.Disposable; -import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxSink; -import reactor.core.publisher.Mono; -import reactor.util.function.Tuple2; -import reactor.util.function.Tuples; - import java.io.IOException; import java.net.URI; import java.net.http.HttpClient; @@ -38,6 +19,36 @@ import java.util.function.Consumer; import java.util.function.Function; +import org.reactivestreams.Publisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.modelcontextprotocol.json.TypeRef; +import io.modelcontextprotocol.json.McpJsonMapper; + +import io.modelcontextprotocol.client.transport.customizer.McpAsyncHttpClientRequestCustomizer; +import io.modelcontextprotocol.client.transport.customizer.McpSyncHttpClientRequestCustomizer; +import io.modelcontextprotocol.client.transport.ResponseSubscribers.ResponseEvent; +import io.modelcontextprotocol.common.McpTransportContext; +import io.modelcontextprotocol.spec.DefaultMcpTransportSession; +import io.modelcontextprotocol.spec.DefaultMcpTransportStream; +import io.modelcontextprotocol.spec.HttpHeaders; +import io.modelcontextprotocol.spec.McpClientTransport; +import io.modelcontextprotocol.spec.McpSchema; +import io.modelcontextprotocol.spec.McpTransportException; +import io.modelcontextprotocol.spec.McpTransportSession; +import io.modelcontextprotocol.spec.McpTransportSessionNotFoundException; +import io.modelcontextprotocol.spec.McpTransportStream; +import io.modelcontextprotocol.spec.ProtocolVersions; +import io.modelcontextprotocol.util.Assert; +import io.modelcontextprotocol.util.Utils; +import reactor.core.Disposable; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; +import reactor.core.publisher.Mono; +import reactor.util.function.Tuple2; +import reactor.util.function.Tuples; + /** * An implementation of the Streamable HTTP protocol as defined by the * 2025-03-26 version of the MCP specification. @@ -77,9 +88,7 @@ public class HttpClientStreamableHttpTransport implements McpClientTransport { */ private final HttpClient httpClient; - /** - * HTTP request builder for building requests to send messages to the server - */ + /** HTTP request builder for building requests to send messages to the server */ private final HttpRequest.Builder requestBuilder; /** @@ -117,8 +126,8 @@ public class HttpClientStreamableHttpTransport implements McpClientTransport { private final AtomicReference> exceptionHandler = new AtomicReference<>(); private HttpClientStreamableHttpTransport(McpJsonMapper jsonMapper, HttpClient httpClient, - HttpRequest.Builder requestBuilder, String baseUri, String endpoint, boolean resumableStreams, - boolean openConnectionOnStartup, McpAsyncHttpClientRequestCustomizer httpRequestCustomizer) { + HttpRequest.Builder requestBuilder, String baseUri, String endpoint, boolean resumableStreams, + boolean openConnectionOnStartup, McpAsyncHttpClientRequestCustomizer httpRequestCustomizer) { this.jsonMapper = jsonMapper; this.httpClient = httpClient; this.requestBuilder = requestBuilder; @@ -165,11 +174,11 @@ private Publisher createDelete(String sessionId) { var uri = Utils.resolveUri(this.baseUri, this.endpoint); return Mono.deferContextual(ctx -> { var builder = this.requestBuilder.copy() - .uri(uri) - .header("Cache-Control", "no-cache") - .header(HttpHeaders.MCP_SESSION_ID, sessionId) - .header(HttpHeaders.PROTOCOL_VERSION, MCP_PROTOCOL_VERSION) - .DELETE(); + .uri(uri) + .header("Cache-Control", "no-cache") + .header(HttpHeaders.MCP_SESSION_ID, sessionId) + .header(HttpHeaders.PROTOCOL_VERSION, MCP_PROTOCOL_VERSION) + .DELETE(); var transportContext = ctx.getOrDefault(McpTransportContext.KEY, McpTransportContext.EMPTY); return Mono.from(this.httpRequestCustomizer.customize(builder, "DELETE", uri, null, transportContext)); }).flatMap(requestBuilder -> { @@ -225,131 +234,131 @@ private Mono reconnect(McpTransportStream stream) { var uri = Utils.resolveUri(this.baseUri, this.endpoint); Disposable connection = Mono.deferContextual(connectionCtx -> { - HttpRequest.Builder requestBuilder = this.requestBuilder.copy(); + HttpRequest.Builder requestBuilder = this.requestBuilder.copy(); - if (transportSession != null && transportSession.sessionId().isPresent()) { - requestBuilder = requestBuilder.header(HttpHeaders.MCP_SESSION_ID, - transportSession.sessionId().get()); - } + if (transportSession != null && transportSession.sessionId().isPresent()) { + requestBuilder = requestBuilder.header(HttpHeaders.MCP_SESSION_ID, + transportSession.sessionId().get()); + } - if (stream != null && stream.lastId().isPresent()) { - requestBuilder = requestBuilder.header(HttpHeaders.LAST_EVENT_ID, stream.lastId().get()); - } + if (stream != null && stream.lastId().isPresent()) { + requestBuilder = requestBuilder.header(HttpHeaders.LAST_EVENT_ID, stream.lastId().get()); + } - var builder = requestBuilder.uri(uri) - .header("Accept", TEXT_EVENT_STREAM) - .header("Cache-Control", "no-cache") - .header(HttpHeaders.PROTOCOL_VERSION, MCP_PROTOCOL_VERSION) - .GET(); - var transportContext = connectionCtx.getOrDefault(McpTransportContext.KEY, McpTransportContext.EMPTY); - return Mono.from(this.httpRequestCustomizer.customize(builder, "GET", uri, null, transportContext)); - }) - .flatMapMany( - requestBuilder -> Flux.create( - sseSink -> this.httpClient - .sendAsync(requestBuilder.build(), - responseInfo -> ResponseSubscribers.sseToBodySubscriber(responseInfo, - sseSink)) - .whenComplete((response, throwable) -> { - if (throwable != null) { - sseSink.error(throwable); - } - else { - logger.debug("SSE connection established successfully"); - } - })) - .map(responseEvent -> (ResponseSubscribers.SseResponseEvent) responseEvent) - .flatMap(responseEvent -> { - int statusCode = responseEvent.responseInfo().statusCode(); - - if (statusCode >= 200 && statusCode < 300) { - - if (MESSAGE_EVENT_TYPE.equals(responseEvent.sseEvent().event())) { - try { - // We don't support batching ATM and probably - // won't since the next version considers - // removing it. - McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage( - this.jsonMapper, responseEvent.sseEvent().data()); - - Tuple2, Iterable> idWithMessages = Tuples - .of(Optional.ofNullable(responseEvent.sseEvent().id()), - List.of(message)); - - McpTransportStream sessionStream = stream != null ? stream - : new DefaultMcpTransportStream<>(this.resumableStreams, + var builder = requestBuilder.uri(uri) + .header("Accept", TEXT_EVENT_STREAM) + .header("Cache-Control", "no-cache") + .header(HttpHeaders.PROTOCOL_VERSION, MCP_PROTOCOL_VERSION) + .GET(); + var transportContext = connectionCtx.getOrDefault(McpTransportContext.KEY, McpTransportContext.EMPTY); + return Mono.from(this.httpRequestCustomizer.customize(builder, "GET", uri, null, transportContext)); + }) + .flatMapMany( + requestBuilder -> Flux.create( + sseSink -> this.httpClient + .sendAsync(requestBuilder.build(), + responseInfo -> ResponseSubscribers.sseToBodySubscriber(responseInfo, + sseSink)) + .whenComplete((response, throwable) -> { + if (throwable != null) { + sseSink.error(throwable); + } + else { + logger.debug("SSE connection established successfully"); + } + })) + .map(responseEvent -> (ResponseSubscribers.SseResponseEvent) responseEvent) + .flatMap(responseEvent -> { + int statusCode = responseEvent.responseInfo().statusCode(); + + if (statusCode >= 200 && statusCode < 300) { + + if (MESSAGE_EVENT_TYPE.equals(responseEvent.sseEvent().event())) { + try { + // We don't support batching ATM and probably + // won't since the next version considers + // removing it. + McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage( + this.jsonMapper, responseEvent.sseEvent().data()); + + Tuple2, Iterable> idWithMessages = Tuples + .of(Optional.ofNullable(responseEvent.sseEvent().id()), + List.of(message)); + + McpTransportStream sessionStream = stream != null ? stream + : new DefaultMcpTransportStream<>(this.resumableStreams, this::reconnect); - logger.debug("Connected stream {}", sessionStream.streamId()); - - return Flux.from(sessionStream.consumeSseStream(Flux.just(idWithMessages))); - + logger.debug("Connected stream {}", sessionStream.streamId()); + + return Flux.from(sessionStream.consumeSseStream(Flux.just(idWithMessages))); + + } + catch (IOException ioException) { + return Flux.error(new McpTransportException( + "Error parsing JSON-RPC message: " + responseEvent, ioException)); + } + } + else { + logger.debug("Received SSE event with type: {}", responseEvent.sseEvent()); + return Flux.empty(); + } } - catch (IOException ioException) { - return Flux.error(new McpTransportException( - "Error parsing JSON-RPC message: " + responseEvent, ioException)); + else if (statusCode == METHOD_NOT_ALLOWED) { // NotAllowed + logger + .debug("The server does not support SSE streams, using request-response mode."); + return Flux.empty(); } - } - else { - logger.debug("Received SSE event with type: {}", responseEvent.sseEvent()); - return Flux.empty(); - } - } - else if (statusCode == METHOD_NOT_ALLOWED) { // NotAllowed - logger - .debug("The server does not support SSE streams, using request-response mode."); - return Flux.empty(); - } - else if (statusCode == NOT_FOUND) { - - if (transportSession != null && transportSession.sessionId().isPresent()) { - // only if the request was sent with a session id - // and the response is 404, we consider it a - // session not found error. - logger.debug("Session not found for session ID: {}", - transportSession.sessionId().get()); - String sessionIdRepresentation = sessionIdOrPlaceholder(transportSession); - McpTransportSessionNotFoundException exception = new McpTransportSessionNotFoundException( - "Session not found for session ID: " + sessionIdRepresentation); - return Flux.error(exception); - } - return Flux.error( - new McpTransportException("Server Not Found. Status code:" + statusCode - + ", response-event:" + responseEvent)); - } - else if (statusCode == BAD_REQUEST) { - if (transportSession != null && transportSession.sessionId().isPresent()) { - // only if the request was sent with a session id - // and thre response is 404, we consider it a - // session not found error. - String sessionIdRepresentation = sessionIdOrPlaceholder(transportSession); - McpTransportSessionNotFoundException exception = new McpTransportSessionNotFoundException( - "Session not found for session ID: " + sessionIdRepresentation); - return Flux.error(exception); - } - return Flux.error( - new McpTransportException("Bad Request. Status code:" + statusCode - + ", response-event:" + responseEvent)); + else if (statusCode == NOT_FOUND) { + + if (transportSession != null && transportSession.sessionId().isPresent()) { + // only if the request was sent with a session id + // and the response is 404, we consider it a + // session not found error. + logger.debug("Session not found for session ID: {}", + transportSession.sessionId().get()); + String sessionIdRepresentation = sessionIdOrPlaceholder(transportSession); + McpTransportSessionNotFoundException exception = new McpTransportSessionNotFoundException( + "Session not found for session ID: " + sessionIdRepresentation); + return Flux.error(exception); + } + return Flux.error( + new McpTransportException("Server Not Found. Status code:" + statusCode + + ", response-event:" + responseEvent)); + } + else if (statusCode == BAD_REQUEST) { + if (transportSession != null && transportSession.sessionId().isPresent()) { + // only if the request was sent with a session id + // and thre response is 404, we consider it a + // session not found error. + String sessionIdRepresentation = sessionIdOrPlaceholder(transportSession); + McpTransportSessionNotFoundException exception = new McpTransportSessionNotFoundException( + "Session not found for session ID: " + sessionIdRepresentation); + return Flux.error(exception); + } + return Flux.error( + new McpTransportException("Bad Request. Status code:" + statusCode + + ", response-event:" + responseEvent)); - } + } - return Flux.error(new McpTransportException( - "Received unrecognized SSE event type: " + responseEvent.sseEvent().event())); - }).flatMap( + return Flux.error(new McpTransportException( + "Received unrecognized SSE event type: " + responseEvent.sseEvent().event())); + }).flatMap( jsonrpcMessage -> this.handler.get().apply(Mono.just(jsonrpcMessage))) - .onErrorMap(CompletionException.class, t -> t.getCause()) - .onErrorComplete(t -> { - this.handleException(t); - return true; - }) - .doFinally(s -> { - Disposable ref = disposableRef.getAndSet(null); - if (ref != null) { - transportSession.removeConnection(ref); - } - })) - .contextWrite(ctx) - .subscribe(); + .onErrorMap(CompletionException.class, t -> t.getCause()) + .onErrorComplete(t -> { + this.handleException(t); + return true; + }) + .doFinally(s -> { + Disposable ref = disposableRef.getAndSet(null); + if (ref != null) { + transportSession.removeConnection(ref); + } + })) + .contextWrite(ctx) + .subscribe(); disposableRef.set(connection); transportSession.addConnection(connection); @@ -403,169 +412,169 @@ public Mono sendMessage(McpSchema.JSONRPCMessage sentMessage) { String jsonBody = this.toString(sentMessage); Disposable connection = Mono.deferContextual(ctx -> { - HttpRequest.Builder requestBuilder = this.requestBuilder.copy(); - - if (transportSession != null && transportSession.sessionId().isPresent()) { - requestBuilder = requestBuilder.header(HttpHeaders.MCP_SESSION_ID, - transportSession.sessionId().get()); - } + HttpRequest.Builder requestBuilder = this.requestBuilder.copy(); - var builder = requestBuilder.uri(uri) - .header("Accept", APPLICATION_JSON + ", " + TEXT_EVENT_STREAM) - .header("Content-Type", APPLICATION_JSON) - .header("Cache-Control", "no-cache") - .header(HttpHeaders.PROTOCOL_VERSION, MCP_PROTOCOL_VERSION) - .POST(HttpRequest.BodyPublishers.ofString(jsonBody)); - var transportContext = ctx.getOrDefault(McpTransportContext.KEY, McpTransportContext.EMPTY); - return Mono - .from(this.httpRequestCustomizer.customize(builder, "POST", uri, jsonBody, transportContext)); - }).flatMapMany(requestBuilder -> Flux.create(responseEventSink -> { - - // Create the async request with proper body subscriber selection - Mono.fromFuture(this.httpClient - .sendAsync(requestBuilder.build(), this.toSendMessageBodySubscriber(responseEventSink)) - .whenComplete((response, throwable) -> { - if (throwable != null) { - responseEventSink.error(throwable); + if (transportSession != null && transportSession.sessionId().isPresent()) { + requestBuilder = requestBuilder.header(HttpHeaders.MCP_SESSION_ID, + transportSession.sessionId().get()); } - else { - logger.debug("SSE connection established successfully"); - } - })).onErrorMap(CompletionException.class, t -> t.getCause()).onErrorComplete().subscribe(); - - })).flatMap(responseEvent -> { - String mcpSessionId = responseEvent.responseInfo() - .headers() - .firstValue("mcp-session-id") - .orElseGet(() -> null); - if (Objects.nonNull(mcpSessionId) && transportSession.markInitialized(mcpSessionId)) { - // Once we have a session, we try to open an async stream for - // the server to send notifications and requests out-of-band. - - reconnect(null).contextWrite(deliveredSink.contextView()).subscribe(); - } - - String sessionRepresentation = sessionIdOrPlaceholder(transportSession); - - int statusCode = responseEvent.responseInfo().statusCode(); - - if (statusCode >= 200 && statusCode < 300) { - - String contentType = responseEvent.responseInfo() - .headers() - .firstValue("Content-Type") - .orElse("") - .toLowerCase(); - - if (contentType.isBlank()) { - logger.debug("No content type returned for POST in session {}", sessionRepresentation); - // No content type means no response body, so we can just - // return - // an empty stream - deliveredSink.success(); - return Flux.empty(); - } - else if (contentType.contains(TEXT_EVENT_STREAM)) { - return Flux.just(((ResponseSubscribers.SseResponseEvent) responseEvent).sseEvent()) - .flatMap(sseEvent -> { - try { - // We don't support batching ATM and probably - // won't - // since the - // next version considers removing it. - McpSchema.JSONRPCMessage message = McpSchema - .deserializeJsonRpcMessage(this.jsonMapper, sseEvent.data()); - - Tuple2, Iterable> idWithMessages = Tuples - .of(Optional.ofNullable(sseEvent.id()), List.of(message)); - McpTransportStream sessionStream = new DefaultMcpTransportStream<>( - this.resumableStreams, this::reconnect); - - logger.debug("Connected stream {}", sessionStream.streamId()); + var builder = requestBuilder.uri(uri) + .header("Accept", APPLICATION_JSON + ", " + TEXT_EVENT_STREAM) + .header("Content-Type", APPLICATION_JSON) + .header("Cache-Control", "no-cache") + .header(HttpHeaders.PROTOCOL_VERSION, MCP_PROTOCOL_VERSION) + .POST(HttpRequest.BodyPublishers.ofString(jsonBody)); + var transportContext = ctx.getOrDefault(McpTransportContext.KEY, McpTransportContext.EMPTY); + return Mono + .from(this.httpRequestCustomizer.customize(builder, "POST", uri, jsonBody, transportContext)); + }).flatMapMany(requestBuilder -> Flux.create(responseEventSink -> { + + // Create the async request with proper body subscriber selection + Mono.fromFuture(this.httpClient + .sendAsync(requestBuilder.build(), this.toSendMessageBodySubscriber(responseEventSink)) + .whenComplete((response, throwable) -> { + if (throwable != null) { + responseEventSink.error(throwable); + } + else { + logger.debug("SSE connection established successfully"); + } + })).onErrorMap(CompletionException.class, t -> t.getCause()).onErrorComplete().subscribe(); + + })).flatMap(responseEvent -> { + String mcpSessionId = responseEvent.responseInfo() + .headers() + .firstValue("mcp-session-id") + .orElseGet(() -> null); + if (Objects.nonNull(mcpSessionId) && transportSession.markInitialized(mcpSessionId)) { + // Once we have a session, we try to open an async stream for + // the server to send notifications and requests out-of-band. + + reconnect(null).contextWrite(deliveredSink.contextView()).subscribe(); + } - deliveredSink.success(); + String sessionRepresentation = sessionIdOrPlaceholder(transportSession); + + int statusCode = responseEvent.responseInfo().statusCode(); + + if (statusCode >= 200 && statusCode < 300) { + + String contentType = responseEvent.responseInfo() + .headers() + .firstValue("Content-Type") + .orElse("") + .toLowerCase(); + + if (contentType.isBlank()) { + logger.debug("No content type returned for POST in session {}", sessionRepresentation); + // No content type means no response body, so we can just + // return + // an empty stream + deliveredSink.success(); + return Flux.empty(); + } + else if (contentType.contains(TEXT_EVENT_STREAM)) { + return Flux.just(((ResponseSubscribers.SseResponseEvent) responseEvent).sseEvent()) + .flatMap(sseEvent -> { + try { + // We don't support batching ATM and probably + // won't + // since the + // next version considers removing it. + McpSchema.JSONRPCMessage message = McpSchema + .deserializeJsonRpcMessage(this.jsonMapper, sseEvent.data()); + + Tuple2, Iterable> idWithMessages = Tuples + .of(Optional.ofNullable(sseEvent.id()), List.of(message)); + + McpTransportStream sessionStream = new DefaultMcpTransportStream<>( + this.resumableStreams, this::reconnect); + + logger.debug("Connected stream {}", sessionStream.streamId()); + + deliveredSink.success(); + + return Flux.from(sessionStream.consumeSseStream(Flux.just(idWithMessages))); + } + catch (IOException ioException) { + return Flux.error(new McpTransportException( + "Error parsing JSON-RPC message: " + responseEvent, ioException)); + } + }); + } + else if (contentType.contains(APPLICATION_JSON)) { + deliveredSink.success(); + String data = ((ResponseSubscribers.AggregateResponseEvent) responseEvent).data(); + if (sentMessage instanceof McpSchema.JSONRPCNotification && Utils.hasText(data)) { + logger.warn("Notification: {} received non-compliant response: {}", sentMessage, data); + return Mono.empty(); + } - return Flux.from(sessionStream.consumeSseStream(Flux.just(idWithMessages))); + try { + return Mono.just(McpSchema.deserializeJsonRpcMessage(jsonMapper, data)); } - catch (IOException ioException) { - return Flux.error(new McpTransportException( - "Error parsing JSON-RPC message: " + responseEvent, ioException)); + catch (IOException e) { + return Mono.error(new McpTransportException( + "Error deserializing JSON-RPC message: " + responseEvent, e)); } - }); - } - else if (contentType.contains(APPLICATION_JSON)) { - deliveredSink.success(); - String data = ((ResponseSubscribers.AggregateResponseEvent) responseEvent).data(); - if (sentMessage instanceof McpSchema.JSONRPCNotification && Utils.hasText(data)) { - logger.warn("Notification: {} received non-compliant response: {}", sentMessage, data); - return Mono.empty(); - } + } + logger.warn("Unknown media type {} returned for POST in session {}", contentType, + sessionRepresentation); - try { - return Mono.just(McpSchema.deserializeJsonRpcMessage(jsonMapper, data)); + return Flux.error( + new RuntimeException("Unknown media type returned: " + contentType)); } - catch (IOException e) { - return Mono.error(new McpTransportException( - "Error deserializing JSON-RPC message: " + responseEvent, e)); + else if (statusCode == NOT_FOUND) { + if (transportSession != null && transportSession.sessionId().isPresent()) { + // only if the request was sent with a session id and the + // response is 404, we consider it a session not found error. + logger.debug("Session not found for session ID: {}", transportSession.sessionId().get()); + McpTransportSessionNotFoundException exception = new McpTransportSessionNotFoundException( + "Session not found for session ID: " + sessionRepresentation); + return Flux.error(exception); + } + return Flux.error(new McpTransportException( + "Server Not Found. Status code:" + statusCode + ", response-event:" + responseEvent)); } - } - logger.warn("Unknown media type {} returned for POST in session {}", contentType, - sessionRepresentation); - - return Flux.error( - new RuntimeException("Unknown media type returned: " + contentType)); - } - else if (statusCode == NOT_FOUND) { - if (transportSession != null && transportSession.sessionId().isPresent()) { - // only if the request was sent with a session id and the - // response is 404, we consider it a session not found error. - logger.debug("Session not found for session ID: {}", transportSession.sessionId().get()); - McpTransportSessionNotFoundException exception = new McpTransportSessionNotFoundException( - "Session not found for session ID: " + sessionRepresentation); - return Flux.error(exception); - } - return Flux.error(new McpTransportException( - "Server Not Found. Status code:" + statusCode + ", response-event:" + responseEvent)); - } - else if (statusCode == BAD_REQUEST) { - // Some implementations can return 400 when presented with a - // session id that it doesn't know about, so we will - // invalidate the session - // https://github.com/modelcontextprotocol/typescript-sdk/issues/389 - - if (transportSession != null && transportSession.sessionId().isPresent()) { - // only if the request was sent with a session id and the - // response is 404, we consider it a session not found error. - McpTransportSessionNotFoundException exception = new McpTransportSessionNotFoundException( - "Session not found for session ID: " + sessionRepresentation); - return Flux.error(exception); - } - return Flux.error(new McpTransportException( - "Bad Request. Status code:" + statusCode + ", response-event:" + responseEvent)); - } - - return Flux.error( - new RuntimeException("Failed to send message: " + responseEvent)); - }) - .flatMap(jsonRpcMessage -> this.handler.get().apply(Mono.just(jsonRpcMessage))) - .onErrorMap(CompletionException.class, t -> t.getCause()) - .onErrorComplete(t -> { - // handle the error first - this.handleException(t); - // inform the caller of sendMessage - deliveredSink.error(t); - return true; - }) - .doFinally(s -> { - logger.debug("SendMessage finally: {}", s); - Disposable ref = disposableRef.getAndSet(null); - if (ref != null) { - transportSession.removeConnection(ref); - } - }) - .contextWrite(deliveredSink.contextView()) - .subscribe(); + else if (statusCode == BAD_REQUEST) { + // Some implementations can return 400 when presented with a + // session id that it doesn't know about, so we will + // invalidate the session + // https://github.com/modelcontextprotocol/typescript-sdk/issues/389 + + if (transportSession != null && transportSession.sessionId().isPresent()) { + // only if the request was sent with a session id and the + // response is 404, we consider it a session not found error. + McpTransportSessionNotFoundException exception = new McpTransportSessionNotFoundException( + "Session not found for session ID: " + sessionRepresentation); + return Flux.error(exception); + } + return Flux.error(new McpTransportException( + "Bad Request. Status code:" + statusCode + ", response-event:" + responseEvent)); + } + + return Flux.error( + new RuntimeException("Failed to send message: " + responseEvent)); + }) + .flatMap(jsonRpcMessage -> this.handler.get().apply(Mono.just(jsonRpcMessage))) + .onErrorMap(CompletionException.class, t -> t.getCause()) + .onErrorComplete(t -> { + // handle the error first + this.handleException(t); + // inform the caller of sendMessage + deliveredSink.error(t); + return true; + }) + .doFinally(s -> { + logger.debug("SendMessage finally: {}", s); + Disposable ref = disposableRef.getAndSet(null); + if (ref != null) { + transportSession.removeConnection(ref); + } + }) + .contextWrite(deliveredSink.contextView()) + .subscribe(); disposableRef.set(connection); transportSession.addConnection(connection); From 8e97e264a6a7c19fb8bd2b9989fe7bc6cdab0d8d Mon Sep 17 00:00:00 2001 From: "clark.cao" Date: Wed, 17 Sep 2025 12:13:55 +0800 Subject: [PATCH 3/3] [fix] when mcp server not give mcp session id , --- .../WebClientStreamableHttpTransport.java | 250 ++++---- .../HttpClientStreamableHttpTransport.java | 549 +++++++++--------- 2 files changed, 398 insertions(+), 401 deletions(-) diff --git a/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java b/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java index 1ac5e3615..f6eeb0b1f 100644 --- a/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java +++ b/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java @@ -106,7 +106,7 @@ public class WebClientStreamableHttpTransport implements McpClientTransport { private final AtomicReference> exceptionHandler = new AtomicReference<>(); private WebClientStreamableHttpTransport(McpJsonMapper jsonMapper, WebClient.Builder webClientBuilder, - String endpoint, boolean resumableStreams, boolean openConnectionOnStartup) { + String endpoint, boolean resumableStreams, boolean openConnectionOnStartup) { this.jsonMapper = jsonMapper; this.webClient = webClientBuilder.build(); this.endpoint = endpoint; @@ -146,16 +146,16 @@ public Mono connect(Function, Mono> onClose = sessionId -> sessionId == null ? Mono.empty() : webClient.delete() - .uri(this.endpoint) - .header(HttpHeaders.MCP_SESSION_ID, sessionId) - .header(HttpHeaders.PROTOCOL_VERSION, MCP_PROTOCOL_VERSION) - .retrieve() - .toBodilessEntity() - .onErrorComplete(e -> { - logger.warn("Got error when closing transport", e); - return true; - }) - .then(); + .uri(this.endpoint) + .header(HttpHeaders.MCP_SESSION_ID, sessionId) + .header(HttpHeaders.PROTOCOL_VERSION, MCP_PROTOCOL_VERSION) + .retrieve() + .toBodilessEntity() + .onErrorComplete(e -> { + logger.warn("Got error when closing transport", e); + return true; + }) + .then(); return new DefaultMcpTransportSession(onClose); } @@ -206,52 +206,52 @@ private Mono reconnect(McpTransportStream stream) { final McpTransportSession transportSession = this.activeSession.get(); Disposable connection = webClient.get() - .uri(this.endpoint) - .accept(MediaType.TEXT_EVENT_STREAM) - .header(HttpHeaders.PROTOCOL_VERSION, MCP_PROTOCOL_VERSION) - .headers(httpHeaders -> { - transportSession.sessionId().ifPresent(id -> httpHeaders.add(HttpHeaders.MCP_SESSION_ID, id)); - if (stream != null) { - stream.lastId().ifPresent(id -> httpHeaders.add(HttpHeaders.LAST_EVENT_ID, id)); - } - }) - .exchangeToFlux(response -> { - if (isEventStream(response)) { - logger.debug("Established SSE stream via GET"); - return eventStream(stream, response); - } - else if (isNotAllowed(response)) { - logger.debug("The server does not support SSE streams, using request-response mode."); - return Flux.empty(); - } - else if (isNotFound(response)) { - if (transportSession.sessionId().isPresent()) { - String sessionIdRepresentation = sessionIdOrPlaceholder(transportSession); - return mcpSessionNotFoundError(sessionIdRepresentation); - } - else { - return this.extractError(response, MISSING_SESSION_ID); - } + .uri(this.endpoint) + .accept(MediaType.TEXT_EVENT_STREAM) + .header(HttpHeaders.PROTOCOL_VERSION, MCP_PROTOCOL_VERSION) + .headers(httpHeaders -> { + transportSession.sessionId().ifPresent(id -> httpHeaders.add(HttpHeaders.MCP_SESSION_ID, id)); + if (stream != null) { + stream.lastId().ifPresent(id -> httpHeaders.add(HttpHeaders.LAST_EVENT_ID, id)); + } + }) + .exchangeToFlux(response -> { + if (isEventStream(response)) { + logger.debug("Established SSE stream via GET"); + return eventStream(stream, response); + } + else if (isNotAllowed(response)) { + logger.debug("The server does not support SSE streams, using request-response mode."); + return Flux.empty(); + } + else if (isNotFound(response)) { + if (transportSession.sessionId().isPresent()) { + String sessionIdRepresentation = sessionIdOrPlaceholder(transportSession); + return mcpSessionNotFoundError(sessionIdRepresentation); } else { - return response.createError().doOnError(e -> { - logger.info("Opening an SSE stream failed. This can be safely ignored.", e); - }).flux(); + return this.extractError(response, MISSING_SESSION_ID); } - }) - .flatMap(jsonrpcMessage -> this.handler.get().apply(Mono.just(jsonrpcMessage))) - .onErrorComplete(t -> { - this.handleException(t); - return true; - }) - .doFinally(s -> { - Disposable ref = disposableRef.getAndSet(null); - if (ref != null) { - transportSession.removeConnection(ref); - } - }) - .contextWrite(ctx) - .subscribe(); + } + else { + return response.createError().doOnError(e -> { + logger.info("Opening an SSE stream failed. This can be safely ignored.", e); + }).flux(); + } + }) + .flatMap(jsonrpcMessage -> this.handler.get().apply(Mono.just(jsonrpcMessage))) + .onErrorComplete(t -> { + this.handleException(t); + return true; + }) + .doFinally(s -> { + Disposable ref = disposableRef.getAndSet(null); + if (ref != null) { + transportSession.removeConnection(ref); + } + }) + .contextWrite(ctx) + .subscribe(); disposableRef.set(connection); transportSession.addConnection(connection); @@ -272,83 +272,83 @@ public Mono sendMessage(McpSchema.JSONRPCMessage message) { final McpTransportSession transportSession = this.activeSession.get(); Disposable connection = webClient.post() - .uri(this.endpoint) - .accept(MediaType.APPLICATION_JSON, MediaType.TEXT_EVENT_STREAM) - .header(HttpHeaders.PROTOCOL_VERSION, MCP_PROTOCOL_VERSION) - .headers(httpHeaders -> { - transportSession.sessionId().ifPresent(id -> httpHeaders.add(HttpHeaders.MCP_SESSION_ID, id)); - }) - .bodyValue(message) - .exchangeToFlux(response -> { - String mcpSessionId = response.headers().asHttpHeaders().getFirst(HttpHeaders.MCP_SESSION_ID); - if (StringUtils.hasText(mcpSessionId) && transportSession.markInitialized(mcpSessionId)) { - // Once we have a session, we try to open an async stream for - // the server to send notifications and requests out-of-band. - reconnect(null).contextWrite(sink.contextView()).subscribe(); + .uri(this.endpoint) + .accept(MediaType.APPLICATION_JSON, MediaType.TEXT_EVENT_STREAM) + .header(HttpHeaders.PROTOCOL_VERSION, MCP_PROTOCOL_VERSION) + .headers(httpHeaders -> { + transportSession.sessionId().ifPresent(id -> httpHeaders.add(HttpHeaders.MCP_SESSION_ID, id)); + }) + .bodyValue(message) + .exchangeToFlux(response -> { + String mcpSessionId = response.headers().asHttpHeaders().getFirst(HttpHeaders.MCP_SESSION_ID); + if (StringUtils.hasText(mcpSessionId) && transportSession.markInitialized(mcpSessionId)) { + // Once we have a session, we try to open an async stream for + // the server to send notifications and requests out-of-band. + reconnect(null).contextWrite(sink.contextView()).subscribe(); + } + + String sessionRepresentation = sessionIdOrPlaceholder(transportSession); + + // The spec mentions only ACCEPTED, but the existing SDKs can return + // 200 OK for notifications + if (response.statusCode().is2xxSuccessful()) { + Optional contentType = response.headers().contentType(); + // Existing SDKs consume notifications with no response body nor + // content type + if (contentType.isEmpty()) { + logger.trace("Message was successfully sent via POST for session {}", + sessionRepresentation); + // signal the caller that the message was successfully + // delivered + sink.success(); + // communicate to downstream there is no streamed data coming + return Flux.empty(); } - - String sessionRepresentation = sessionIdOrPlaceholder(transportSession); - - // The spec mentions only ACCEPTED, but the existing SDKs can return - // 200 OK for notifications - if (response.statusCode().is2xxSuccessful()) { - Optional contentType = response.headers().contentType(); - // Existing SDKs consume notifications with no response body nor - // content type - if (contentType.isEmpty()) { - logger.trace("Message was successfully sent via POST for session {}", - sessionRepresentation); - // signal the caller that the message was successfully - // delivered + else { + MediaType mediaType = contentType.get(); + if (mediaType.isCompatibleWith(MediaType.TEXT_EVENT_STREAM)) { + logger.debug("Established SSE stream via POST"); + // communicate to caller that the message was delivered sink.success(); - // communicate to downstream there is no streamed data coming - return Flux.empty(); + // starting a stream + return newEventStream(response, sessionRepresentation); } - else { - MediaType mediaType = contentType.get(); - if (mediaType.isCompatibleWith(MediaType.TEXT_EVENT_STREAM)) { - logger.debug("Established SSE stream via POST"); - // communicate to caller that the message was delivered - sink.success(); - // starting a stream - return newEventStream(response, sessionRepresentation); - } - else if (mediaType.isCompatibleWith(MediaType.APPLICATION_JSON)) { - logger.trace("Received response to POST for session {}", sessionRepresentation); - // communicate to caller the message was delivered - sink.success(); - return directResponseFlux(message, response); - } - else { - logger.warn("Unknown media type {} returned for POST in session {}", contentType, - sessionRepresentation); - return Flux.error(new RuntimeException("Unknown media type returned: " + contentType)); - } + else if (mediaType.isCompatibleWith(MediaType.APPLICATION_JSON)) { + logger.trace("Received response to POST for session {}", sessionRepresentation); + // communicate to caller the message was delivered + sink.success(); + return directResponseFlux(message, response); } - } - else { - if (isNotFound(response) && !sessionRepresentation.equals(MISSING_SESSION_ID)) { - return mcpSessionNotFoundError(sessionRepresentation); + else { + logger.warn("Unknown media type {} returned for POST in session {}", contentType, + sessionRepresentation); + return Flux.error(new RuntimeException("Unknown media type returned: " + contentType)); } - return this.extractError(response, sessionRepresentation); } - }) - .flatMap(jsonRpcMessage -> this.handler.get().apply(Mono.just(jsonRpcMessage))) - .onErrorComplete(t -> { - // handle the error first - this.handleException(t); - // inform the caller of sendMessage - sink.error(t); - return true; - }) - .doFinally(s -> { - Disposable ref = disposableRef.getAndSet(null); - if (ref != null) { - transportSession.removeConnection(ref); + } + else { + if (isNotFound(response) && !sessionRepresentation.equals(MISSING_SESSION_ID)) { + return mcpSessionNotFoundError(sessionRepresentation); } - }) - .contextWrite(sink.contextView()) - .subscribe(); + return this.extractError(response, sessionRepresentation); + } + }) + .flatMap(jsonRpcMessage -> this.handler.get().apply(Mono.just(jsonRpcMessage))) + .onErrorComplete(t -> { + // handle the error first + this.handleException(t); + // inform the caller of sendMessage + sink.error(t); + return true; + }) + .doFinally(s -> { + Disposable ref = disposableRef.getAndSet(null); + if (ref != null) { + transportSession.removeConnection(ref); + } + }) + .contextWrite(sink.contextView()) + .subscribe(); disposableRef.set(connection); transportSession.addConnection(connection); }); @@ -419,7 +419,7 @@ private static String sessionIdOrPlaceholder(McpTransportSession transportSes } private Flux directResponseFlux(McpSchema.JSONRPCMessage sentMessage, - ClientResponse response) { + ClientResponse response) { return response.bodyToMono(String.class).>handle((responseMessage, s) -> { try { if (sentMessage instanceof McpSchema.JSONRPCNotification && Utils.hasText(responseMessage)) { diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java b/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java index b9e9b0069..fff8d5d97 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java +++ b/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java @@ -126,8 +126,8 @@ public class HttpClientStreamableHttpTransport implements McpClientTransport { private final AtomicReference> exceptionHandler = new AtomicReference<>(); private HttpClientStreamableHttpTransport(McpJsonMapper jsonMapper, HttpClient httpClient, - HttpRequest.Builder requestBuilder, String baseUri, String endpoint, boolean resumableStreams, - boolean openConnectionOnStartup, McpAsyncHttpClientRequestCustomizer httpRequestCustomizer) { + HttpRequest.Builder requestBuilder, String baseUri, String endpoint, boolean resumableStreams, + boolean openConnectionOnStartup, McpAsyncHttpClientRequestCustomizer httpRequestCustomizer) { this.jsonMapper = jsonMapper; this.httpClient = httpClient; this.requestBuilder = requestBuilder; @@ -174,11 +174,11 @@ private Publisher createDelete(String sessionId) { var uri = Utils.resolveUri(this.baseUri, this.endpoint); return Mono.deferContextual(ctx -> { var builder = this.requestBuilder.copy() - .uri(uri) - .header("Cache-Control", "no-cache") - .header(HttpHeaders.MCP_SESSION_ID, sessionId) - .header(HttpHeaders.PROTOCOL_VERSION, MCP_PROTOCOL_VERSION) - .DELETE(); + .uri(uri) + .header("Cache-Control", "no-cache") + .header(HttpHeaders.MCP_SESSION_ID, sessionId) + .header(HttpHeaders.PROTOCOL_VERSION, MCP_PROTOCOL_VERSION) + .DELETE(); var transportContext = ctx.getOrDefault(McpTransportContext.KEY, McpTransportContext.EMPTY); return Mono.from(this.httpRequestCustomizer.customize(builder, "DELETE", uri, null, transportContext)); }).flatMap(requestBuilder -> { @@ -234,131 +234,131 @@ private Mono reconnect(McpTransportStream stream) { var uri = Utils.resolveUri(this.baseUri, this.endpoint); Disposable connection = Mono.deferContextual(connectionCtx -> { - HttpRequest.Builder requestBuilder = this.requestBuilder.copy(); + HttpRequest.Builder requestBuilder = this.requestBuilder.copy(); - if (transportSession != null && transportSession.sessionId().isPresent()) { - requestBuilder = requestBuilder.header(HttpHeaders.MCP_SESSION_ID, - transportSession.sessionId().get()); - } + if (transportSession != null && transportSession.sessionId().isPresent()) { + requestBuilder = requestBuilder.header(HttpHeaders.MCP_SESSION_ID, + transportSession.sessionId().get()); + } - if (stream != null && stream.lastId().isPresent()) { - requestBuilder = requestBuilder.header(HttpHeaders.LAST_EVENT_ID, stream.lastId().get()); - } + if (stream != null && stream.lastId().isPresent()) { + requestBuilder = requestBuilder.header(HttpHeaders.LAST_EVENT_ID, stream.lastId().get()); + } - var builder = requestBuilder.uri(uri) - .header("Accept", TEXT_EVENT_STREAM) - .header("Cache-Control", "no-cache") - .header(HttpHeaders.PROTOCOL_VERSION, MCP_PROTOCOL_VERSION) - .GET(); - var transportContext = connectionCtx.getOrDefault(McpTransportContext.KEY, McpTransportContext.EMPTY); - return Mono.from(this.httpRequestCustomizer.customize(builder, "GET", uri, null, transportContext)); - }) - .flatMapMany( - requestBuilder -> Flux.create( - sseSink -> this.httpClient - .sendAsync(requestBuilder.build(), - responseInfo -> ResponseSubscribers.sseToBodySubscriber(responseInfo, - sseSink)) - .whenComplete((response, throwable) -> { - if (throwable != null) { - sseSink.error(throwable); - } - else { - logger.debug("SSE connection established successfully"); - } - })) - .map(responseEvent -> (ResponseSubscribers.SseResponseEvent) responseEvent) - .flatMap(responseEvent -> { - int statusCode = responseEvent.responseInfo().statusCode(); - - if (statusCode >= 200 && statusCode < 300) { - - if (MESSAGE_EVENT_TYPE.equals(responseEvent.sseEvent().event())) { - try { - // We don't support batching ATM and probably - // won't since the next version considers - // removing it. - McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage( - this.jsonMapper, responseEvent.sseEvent().data()); - - Tuple2, Iterable> idWithMessages = Tuples - .of(Optional.ofNullable(responseEvent.sseEvent().id()), - List.of(message)); - - McpTransportStream sessionStream = stream != null ? stream - : new DefaultMcpTransportStream<>(this.resumableStreams, - this::reconnect); - logger.debug("Connected stream {}", sessionStream.streamId()); - - return Flux.from(sessionStream.consumeSseStream(Flux.just(idWithMessages))); - - } - catch (IOException ioException) { - return Flux.error(new McpTransportException( - "Error parsing JSON-RPC message: " + responseEvent, ioException)); - } - } - else { - logger.debug("Received SSE event with type: {}", responseEvent.sseEvent()); - return Flux.empty(); - } - } - else if (statusCode == METHOD_NOT_ALLOWED) { // NotAllowed - logger - .debug("The server does not support SSE streams, using request-response mode."); - return Flux.empty(); + var builder = requestBuilder.uri(uri) + .header("Accept", TEXT_EVENT_STREAM) + .header("Cache-Control", "no-cache") + .header(HttpHeaders.PROTOCOL_VERSION, MCP_PROTOCOL_VERSION) + .GET(); + var transportContext = connectionCtx.getOrDefault(McpTransportContext.KEY, McpTransportContext.EMPTY); + return Mono.from(this.httpRequestCustomizer.customize(builder, "GET", uri, null, transportContext)); + }) + .flatMapMany( + requestBuilder -> Flux.create( + sseSink -> this.httpClient + .sendAsync(requestBuilder.build(), + responseInfo -> ResponseSubscribers.sseToBodySubscriber(responseInfo, + sseSink)) + .whenComplete((response, throwable) -> { + if (throwable != null) { + sseSink.error(throwable); } - else if (statusCode == NOT_FOUND) { - - if (transportSession != null && transportSession.sessionId().isPresent()) { - // only if the request was sent with a session id - // and the response is 404, we consider it a - // session not found error. - logger.debug("Session not found for session ID: {}", - transportSession.sessionId().get()); - String sessionIdRepresentation = sessionIdOrPlaceholder(transportSession); - McpTransportSessionNotFoundException exception = new McpTransportSessionNotFoundException( - "Session not found for session ID: " + sessionIdRepresentation); - return Flux.error(exception); - } - return Flux.error( - new McpTransportException("Server Not Found. Status code:" + statusCode - + ", response-event:" + responseEvent)); + else { + logger.debug("SSE connection established successfully"); } - else if (statusCode == BAD_REQUEST) { - if (transportSession != null && transportSession.sessionId().isPresent()) { - // only if the request was sent with a session id - // and thre response is 404, we consider it a - // session not found error. - String sessionIdRepresentation = sessionIdOrPlaceholder(transportSession); - McpTransportSessionNotFoundException exception = new McpTransportSessionNotFoundException( - "Session not found for session ID: " + sessionIdRepresentation); - return Flux.error(exception); - } - return Flux.error( - new McpTransportException("Bad Request. Status code:" + statusCode - + ", response-event:" + responseEvent)); + })) + .map(responseEvent -> (ResponseSubscribers.SseResponseEvent) responseEvent) + .flatMap(responseEvent -> { + int statusCode = responseEvent.responseInfo().statusCode(); + + if (statusCode >= 200 && statusCode < 300) { + + if (MESSAGE_EVENT_TYPE.equals(responseEvent.sseEvent().event())) { + try { + // We don't support batching ATM and probably + // won't since the next version considers + // removing it. + McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage( + this.jsonMapper, responseEvent.sseEvent().data()); + + Tuple2, Iterable> idWithMessages = Tuples + .of(Optional.ofNullable(responseEvent.sseEvent().id()), + List.of(message)); + + McpTransportStream sessionStream = stream != null ? stream + : new DefaultMcpTransportStream<>(this.resumableStreams, + this::reconnect); + logger.debug("Connected stream {}", sessionStream.streamId()); + + return Flux.from(sessionStream.consumeSseStream(Flux.just(idWithMessages))); } + catch (IOException ioException) { + return Flux.error(new McpTransportException( + "Error parsing JSON-RPC message: " + responseEvent, ioException)); + } + } + else { + logger.debug("Received SSE event with type: {}", responseEvent.sseEvent()); + return Flux.empty(); + } + } + else if (statusCode == METHOD_NOT_ALLOWED) { // NotAllowed + logger + .debug("The server does not support SSE streams, using request-response mode."); + return Flux.empty(); + } + else if (statusCode == NOT_FOUND) { + + if (transportSession != null && transportSession.sessionId().isPresent()) { + // only if the request was sent with a session id + // and the response is 404, we consider it a + // session not found error. + logger.debug("Session not found for session ID: {}", + transportSession.sessionId().get()); + String sessionIdRepresentation = sessionIdOrPlaceholder(transportSession); + McpTransportSessionNotFoundException exception = new McpTransportSessionNotFoundException( + "Session not found for session ID: " + sessionIdRepresentation); + return Flux.error(exception); + } + return Flux.error( + new McpTransportException("Server Not Found. Status code:" + statusCode + + ", response-event:" + responseEvent)); + } + else if (statusCode == BAD_REQUEST) { + if (transportSession != null && transportSession.sessionId().isPresent()) { + // only if the request was sent with a session id + // and thre response is 404, we consider it a + // session not found error. + String sessionIdRepresentation = sessionIdOrPlaceholder(transportSession); + McpTransportSessionNotFoundException exception = new McpTransportSessionNotFoundException( + "Session not found for session ID: " + sessionIdRepresentation); + return Flux.error(exception); + } + return Flux.error( + new McpTransportException("Bad Request. Status code:" + statusCode + + ", response-event:" + responseEvent)); + + } - return Flux.error(new McpTransportException( - "Received unrecognized SSE event type: " + responseEvent.sseEvent().event())); - }).flatMap( + return Flux.error(new McpTransportException( + "Received unrecognized SSE event type: " + responseEvent.sseEvent().event())); + }).flatMap( jsonrpcMessage -> this.handler.get().apply(Mono.just(jsonrpcMessage))) - .onErrorMap(CompletionException.class, t -> t.getCause()) - .onErrorComplete(t -> { - this.handleException(t); - return true; - }) - .doFinally(s -> { - Disposable ref = disposableRef.getAndSet(null); - if (ref != null) { - transportSession.removeConnection(ref); - } - })) - .contextWrite(ctx) - .subscribe(); + .onErrorMap(CompletionException.class, t -> t.getCause()) + .onErrorComplete(t -> { + this.handleException(t); + return true; + }) + .doFinally(s -> { + Disposable ref = disposableRef.getAndSet(null); + if (ref != null) { + transportSession.removeConnection(ref); + } + })) + .contextWrite(ctx) + .subscribe(); disposableRef.set(connection); transportSession.addConnection(connection); @@ -412,169 +412,166 @@ public Mono sendMessage(McpSchema.JSONRPCMessage sentMessage) { String jsonBody = this.toString(sentMessage); Disposable connection = Mono.deferContextual(ctx -> { - HttpRequest.Builder requestBuilder = this.requestBuilder.copy(); + HttpRequest.Builder requestBuilder = this.requestBuilder.copy(); - if (transportSession != null && transportSession.sessionId().isPresent()) { - requestBuilder = requestBuilder.header(HttpHeaders.MCP_SESSION_ID, - transportSession.sessionId().get()); - } + if (transportSession != null && transportSession.sessionId().isPresent()) { + requestBuilder = requestBuilder.header(HttpHeaders.MCP_SESSION_ID, + transportSession.sessionId().get()); + } - var builder = requestBuilder.uri(uri) - .header("Accept", APPLICATION_JSON + ", " + TEXT_EVENT_STREAM) - .header("Content-Type", APPLICATION_JSON) - .header("Cache-Control", "no-cache") - .header(HttpHeaders.PROTOCOL_VERSION, MCP_PROTOCOL_VERSION) - .POST(HttpRequest.BodyPublishers.ofString(jsonBody)); - var transportContext = ctx.getOrDefault(McpTransportContext.KEY, McpTransportContext.EMPTY); - return Mono - .from(this.httpRequestCustomizer.customize(builder, "POST", uri, jsonBody, transportContext)); - }).flatMapMany(requestBuilder -> Flux.create(responseEventSink -> { - - // Create the async request with proper body subscriber selection - Mono.fromFuture(this.httpClient - .sendAsync(requestBuilder.build(), this.toSendMessageBodySubscriber(responseEventSink)) - .whenComplete((response, throwable) -> { - if (throwable != null) { - responseEventSink.error(throwable); - } - else { - logger.debug("SSE connection established successfully"); - } - })).onErrorMap(CompletionException.class, t -> t.getCause()).onErrorComplete().subscribe(); - - })).flatMap(responseEvent -> { - String mcpSessionId = responseEvent.responseInfo() - .headers() - .firstValue("mcp-session-id") - .orElseGet(() -> null); - if (Objects.nonNull(mcpSessionId) && transportSession.markInitialized(mcpSessionId)) { - // Once we have a session, we try to open an async stream for - // the server to send notifications and requests out-of-band. - - reconnect(null).contextWrite(deliveredSink.contextView()).subscribe(); + var builder = requestBuilder.uri(uri) + .header("Accept", APPLICATION_JSON + ", " + TEXT_EVENT_STREAM) + .header("Content-Type", APPLICATION_JSON) + .header("Cache-Control", "no-cache") + .header(HttpHeaders.PROTOCOL_VERSION, MCP_PROTOCOL_VERSION) + .POST(HttpRequest.BodyPublishers.ofString(jsonBody)); + var transportContext = ctx.getOrDefault(McpTransportContext.KEY, McpTransportContext.EMPTY); + return Mono + .from(this.httpRequestCustomizer.customize(builder, "POST", uri, jsonBody, transportContext)); + }).flatMapMany(requestBuilder -> Flux.create(responseEventSink -> { + + // Create the async request with proper body subscriber selection + Mono.fromFuture(this.httpClient + .sendAsync(requestBuilder.build(), this.toSendMessageBodySubscriber(responseEventSink)) + .whenComplete((response, throwable) -> { + if (throwable != null) { + responseEventSink.error(throwable); } + else { + logger.debug("SSE connection established successfully"); + } + })).onErrorMap(CompletionException.class, t -> t.getCause()).onErrorComplete().subscribe(); + + })).flatMap(responseEvent -> { + String mcpSessionId = responseEvent.responseInfo().headers().firstValue("mcp-session-id").orElseGet(() -> null); + if (Objects.nonNull(mcpSessionId) && transportSession.markInitialized(mcpSessionId)) { + // Once we have a session, we try to open an async stream for + // the server to send notifications and requests out-of-band. + + reconnect(null).contextWrite(deliveredSink.contextView()).subscribe(); + } + + String sessionRepresentation = sessionIdOrPlaceholder(transportSession); + + int statusCode = responseEvent.responseInfo().statusCode(); + + if (statusCode >= 200 && statusCode < 300) { + + String contentType = responseEvent.responseInfo() + .headers() + .firstValue("Content-Type") + .orElse("") + .toLowerCase(); + + if (contentType.isBlank()) { + logger.debug("No content type returned for POST in session {}", sessionRepresentation); + // No content type means no response body, so we can just + // return + // an empty stream + deliveredSink.success(); + return Flux.empty(); + } + else if (contentType.contains(TEXT_EVENT_STREAM)) { + return Flux.just(((ResponseSubscribers.SseResponseEvent) responseEvent).sseEvent()) + .flatMap(sseEvent -> { + try { + // We don't support batching ATM and probably + // won't + // since the + // next version considers removing it. + McpSchema.JSONRPCMessage message = McpSchema + .deserializeJsonRpcMessage(this.jsonMapper, sseEvent.data()); - String sessionRepresentation = sessionIdOrPlaceholder(transportSession); - - int statusCode = responseEvent.responseInfo().statusCode(); - - if (statusCode >= 200 && statusCode < 300) { - - String contentType = responseEvent.responseInfo() - .headers() - .firstValue("Content-Type") - .orElse("") - .toLowerCase(); - - if (contentType.isBlank()) { - logger.debug("No content type returned for POST in session {}", sessionRepresentation); - // No content type means no response body, so we can just - // return - // an empty stream - deliveredSink.success(); - return Flux.empty(); - } - else if (contentType.contains(TEXT_EVENT_STREAM)) { - return Flux.just(((ResponseSubscribers.SseResponseEvent) responseEvent).sseEvent()) - .flatMap(sseEvent -> { - try { - // We don't support batching ATM and probably - // won't - // since the - // next version considers removing it. - McpSchema.JSONRPCMessage message = McpSchema - .deserializeJsonRpcMessage(this.jsonMapper, sseEvent.data()); - - Tuple2, Iterable> idWithMessages = Tuples - .of(Optional.ofNullable(sseEvent.id()), List.of(message)); - - McpTransportStream sessionStream = new DefaultMcpTransportStream<>( - this.resumableStreams, this::reconnect); - - logger.debug("Connected stream {}", sessionStream.streamId()); - - deliveredSink.success(); - - return Flux.from(sessionStream.consumeSseStream(Flux.just(idWithMessages))); - } - catch (IOException ioException) { - return Flux.error(new McpTransportException( - "Error parsing JSON-RPC message: " + responseEvent, ioException)); - } - }); - } - else if (contentType.contains(APPLICATION_JSON)) { - deliveredSink.success(); - String data = ((ResponseSubscribers.AggregateResponseEvent) responseEvent).data(); - if (sentMessage instanceof McpSchema.JSONRPCNotification && Utils.hasText(data)) { - logger.warn("Notification: {} received non-compliant response: {}", sentMessage, data); - return Mono.empty(); - } + Tuple2, Iterable> idWithMessages = Tuples + .of(Optional.ofNullable(sseEvent.id()), List.of(message)); - try { - return Mono.just(McpSchema.deserializeJsonRpcMessage(jsonMapper, data)); + McpTransportStream sessionStream = new DefaultMcpTransportStream<>( + this.resumableStreams, this::reconnect); + + logger.debug("Connected stream {}", sessionStream.streamId()); + + deliveredSink.success(); + + return Flux.from(sessionStream.consumeSseStream(Flux.just(idWithMessages))); } - catch (IOException e) { - return Mono.error(new McpTransportException( - "Error deserializing JSON-RPC message: " + responseEvent, e)); + catch (IOException ioException) { + return Flux.error(new McpTransportException( + "Error parsing JSON-RPC message: " + responseEvent, ioException)); } - } - logger.warn("Unknown media type {} returned for POST in session {}", contentType, - sessionRepresentation); - - return Flux.error( - new RuntimeException("Unknown media type returned: " + contentType)); - } - else if (statusCode == NOT_FOUND) { - if (transportSession != null && transportSession.sessionId().isPresent()) { - // only if the request was sent with a session id and the - // response is 404, we consider it a session not found error. - logger.debug("Session not found for session ID: {}", transportSession.sessionId().get()); - McpTransportSessionNotFoundException exception = new McpTransportSessionNotFoundException( - "Session not found for session ID: " + sessionRepresentation); - return Flux.error(exception); - } - return Flux.error(new McpTransportException( - "Server Not Found. Status code:" + statusCode + ", response-event:" + responseEvent)); - } - else if (statusCode == BAD_REQUEST) { - // Some implementations can return 400 when presented with a - // session id that it doesn't know about, so we will - // invalidate the session - // https://github.com/modelcontextprotocol/typescript-sdk/issues/389 - - if (transportSession != null && transportSession.sessionId().isPresent()) { - // only if the request was sent with a session id and the - // response is 404, we consider it a session not found error. - McpTransportSessionNotFoundException exception = new McpTransportSessionNotFoundException( - "Session not found for session ID: " + sessionRepresentation); - return Flux.error(exception); - } - return Flux.error(new McpTransportException( - "Bad Request. Status code:" + statusCode + ", response-event:" + responseEvent)); + }); + } + else if (contentType.contains(APPLICATION_JSON)) { + deliveredSink.success(); + String data = ((ResponseSubscribers.AggregateResponseEvent) responseEvent).data(); + if (sentMessage instanceof McpSchema.JSONRPCNotification && Utils.hasText(data)) { + logger.warn("Notification: {} received non-compliant response: {}", sentMessage, data); + return Mono.empty(); } - return Flux.error( - new RuntimeException("Failed to send message: " + responseEvent)); - }) - .flatMap(jsonRpcMessage -> this.handler.get().apply(Mono.just(jsonRpcMessage))) - .onErrorMap(CompletionException.class, t -> t.getCause()) - .onErrorComplete(t -> { - // handle the error first - this.handleException(t); - // inform the caller of sendMessage - deliveredSink.error(t); - return true; - }) - .doFinally(s -> { - logger.debug("SendMessage finally: {}", s); - Disposable ref = disposableRef.getAndSet(null); - if (ref != null) { - transportSession.removeConnection(ref); + try { + return Mono.just(McpSchema.deserializeJsonRpcMessage(jsonMapper, data)); } - }) - .contextWrite(deliveredSink.contextView()) - .subscribe(); + catch (IOException e) { + return Mono.error(new McpTransportException( + "Error deserializing JSON-RPC message: " + responseEvent, e)); + } + } + logger.warn("Unknown media type {} returned for POST in session {}", contentType, + sessionRepresentation); + + return Flux.error( + new RuntimeException("Unknown media type returned: " + contentType)); + } + else if (statusCode == NOT_FOUND) { + if (transportSession != null && transportSession.sessionId().isPresent()) { + // only if the request was sent with a session id and the + // response is 404, we consider it a session not found error. + logger.debug("Session not found for session ID: {}", transportSession.sessionId().get()); + McpTransportSessionNotFoundException exception = new McpTransportSessionNotFoundException( + "Session not found for session ID: " + sessionRepresentation); + return Flux.error(exception); + } + return Flux.error(new McpTransportException( + "Server Not Found. Status code:" + statusCode + ", response-event:" + responseEvent)); + } + else if (statusCode == BAD_REQUEST) { + // Some implementations can return 400 when presented with a + // session id that it doesn't know about, so we will + // invalidate the session + // https://github.com/modelcontextprotocol/typescript-sdk/issues/389 + + if (transportSession != null && transportSession.sessionId().isPresent()) { + // only if the request was sent with a session id and the + // response is 404, we consider it a session not found error. + McpTransportSessionNotFoundException exception = new McpTransportSessionNotFoundException( + "Session not found for session ID: " + sessionRepresentation); + return Flux.error(exception); + } + return Flux.error(new McpTransportException( + "Bad Request. Status code:" + statusCode + ", response-event:" + responseEvent)); + } + + return Flux.error( + new RuntimeException("Failed to send message: " + responseEvent)); + }) + .flatMap(jsonRpcMessage -> this.handler.get().apply(Mono.just(jsonRpcMessage))) + .onErrorMap(CompletionException.class, t -> t.getCause()) + .onErrorComplete(t -> { + // handle the error first + this.handleException(t); + // inform the caller of sendMessage + deliveredSink.error(t); + return true; + }) + .doFinally(s -> { + logger.debug("SendMessage finally: {}", s); + Disposable ref = disposableRef.getAndSet(null); + if (ref != null) { + transportSession.removeConnection(ref); + } + }) + .contextWrite(deliveredSink.contextView()) + .subscribe(); disposableRef.set(connection); transportSession.addConnection(connection);