From a4474f179ad8c1e3b08b1fc8bcb10da01686b135 Mon Sep 17 00:00:00 2001 From: Georg Lokowandt Date: Mon, 22 Sep 2025 12:53:56 +0200 Subject: [PATCH] Feature: handle rate limiting of UAA server. --- README.md | 3 +- .../reactor/uaa/AbstractUaaOperations.java | 480 +++++++++++++----- .../reactor/uaa/ReactorRatelimit.java | 53 ++ .../cloudfoundry/reactor/uaa/UaaOperator.java | 130 +++++ .../reactor/uaa/UaaThrottler.java | 261 ++++++++++ .../reactor/uaa/_ReactorUaaClient.java | 7 + .../reactor/uaa/_UaaRatelimit.java | 17 + .../util/AbstractReactorOperations.java | 6 +- .../cloudfoundry/reactor/util/Operator.java | 30 +- .../reactor/util/RequestLogger.java | 10 +- .../reactor/AbstractRestTest.java | 4 +- .../reactor/uaa/groups/ReactorGroupsTest.java | 139 ++++- .../fixtures/uaa/groups/GET_429_response.json | 3 + .../java/org/cloudfoundry/uaa/UaaClient.java | 6 + .../cloudfoundry/uaa/ratelimit/Ratelimit.java | 27 + .../cloudfoundry/uaa/ratelimit/_Current.java | 63 +++ .../uaa/ratelimit/_RatelimitRequest.java | 24 + .../uaa/ratelimit/_RatelimitResponse.java | 36 ++ .../IntegrationTestConfiguration.java | 91 +++- .../uaa/UaaRatelimitInitializer.java | 79 +++ 20 files changed, 1284 insertions(+), 185 deletions(-) create mode 100644 cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/uaa/ReactorRatelimit.java create mode 100644 cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/uaa/UaaOperator.java create mode 100644 cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/uaa/UaaThrottler.java create mode 100644 cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/uaa/_UaaRatelimit.java create mode 100644 cloudfoundry-client-reactor/src/test/resources/fixtures/uaa/groups/GET_429_response.json create mode 100644 cloudfoundry-client/src/main/java/org/cloudfoundry/uaa/ratelimit/Ratelimit.java create mode 100644 cloudfoundry-client/src/main/java/org/cloudfoundry/uaa/ratelimit/_Current.java create mode 100644 cloudfoundry-client/src/main/java/org/cloudfoundry/uaa/ratelimit/_RatelimitRequest.java create mode 100644 cloudfoundry-client/src/main/java/org/cloudfoundry/uaa/ratelimit/_RatelimitResponse.java create mode 100644 integration-test/src/test/java/org/cloudfoundry/uaa/UaaRatelimitInitializer.java diff --git a/README.md b/README.md index ffed2f36f4e..58bb8269b33 100644 --- a/README.md +++ b/README.md @@ -253,7 +253,7 @@ Beyond that, it is helpful to capture the following information: If you open a Github issue with a request for help, please include as much of the information above as possible and do not forget to sanitize any request/response data posted. ## Development -The project depends on Java 8. To build from source and install to your local Maven cache, run the following: +The project depends on Java 8 to 21. To build from source and install to your local Maven cache, run the following: ```shell $ git submodule update --init --recursive @@ -297,6 +297,7 @@ Name | Description `TEST_PROXY_PORT` | _(Optional)_ The port of a proxy to route all requests through. Defaults to `8080`. `TEST_PROXY_USERNAME` | _(Optional)_ The username for a proxy to route all requests through `TEST_SKIPSSLVALIDATION` | _(Optional)_ Whether to skip SSL validation when connecting to the Cloud Foundry instance. Defaults to `false`. +`UAA_API_REQUEST_LIMIT` | _(Optional)_ If your UAA server does rate limiting and returns 429 errors, set this variable to a value smaller than the limit. Defaults to `0` (no limit) If you do not have access to a CloudFoundry instance with admin access, you can run one locally using [bosh-deployment](https://github.com/cloudfoundry/bosh-deployment) & [cf-deployment](https://github.com/cloudfoundry/cf-deployment/) and Virtualbox. diff --git a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/uaa/AbstractUaaOperations.java b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/uaa/AbstractUaaOperations.java index d4ab1ab9d2d..5b91802e510 100644 --- a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/uaa/AbstractUaaOperations.java +++ b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/uaa/AbstractUaaOperations.java @@ -24,6 +24,7 @@ import org.cloudfoundry.reactor.ConnectionContext; import org.cloudfoundry.reactor.TokenProvider; import org.cloudfoundry.reactor.client.QueryBuilder; +import org.cloudfoundry.reactor.uaa.UaaThrottler.ResourceToken; import org.cloudfoundry.reactor.util.AbstractReactorOperations; import org.cloudfoundry.reactor.util.ErrorPayloadMappers; import org.cloudfoundry.reactor.util.Operator; @@ -43,62 +44,114 @@ protected AbstractUaaOperations( super(connectionContext, root, tokenProvider, requestTags); } - @Override - protected Mono createOperator() { - return super.createOperator().map(this::attachErrorPayloadMapper); + private Mono createOperator(ResourceToken token) { + return this.root + .map(super::buildOperatorContext) + .flatMap( + context -> + Mono.just( + new UaaOperator( + context, + this.connectionContext.getHttpClient(), + token, + "")) + .map(op -> op.headers(super::addHeaders)) + .map(op -> op.headersWhen(super::addHeadersWhen))) + .map(this::attachErrorPayloadMapper); } protected final Mono delete( Object requestPayload, Class responseType, Function uriTransformer) { - return createOperator() + return UaaThrottler.getInstance() + .acquire( + queryTransformer(requestPayload) + .andThen(uriTransformer) + .apply(UriComponentsBuilder.fromPath("")) + .build() + .toUriString()) + .flatMap(token -> delete(requestPayload, responseType, uriTransformer, token)); + } + + private Mono delete( + Object requestPayload, + Class responseType, + Function uriTransformer, + ResourceToken token) { + return createOperator(token) .flatMap( - operator -> - operator.headers(headers -> addHeaders(headers, requestPayload)) - .delete() - .uri( - queryTransformer(requestPayload) - .andThen(uriTransformer)) - .send(requestPayload) - .response() - .parseBody(responseType)); + operator -> { + return operator.headers(headers -> addHeaders(headers, requestPayload)) + .delete() + .uri(queryTransformer(requestPayload).andThen(uriTransformer)) + .send(requestPayload) + .response() + .parseBody(responseType); + }); } protected final Mono get( Object requestPayload, Function uriTransformer) { - return createOperator() + return UaaThrottler.getInstance() + .acquire( + queryTransformer(requestPayload) + .andThen(uriTransformer) + .apply(UriComponentsBuilder.fromPath("")) + .build() + .toUriString()) + .flatMap(token -> get(requestPayload, uriTransformer, token)); + } + + private Mono get( + Object requestPayload, + Function uriTransformer, + ResourceToken token) { + return createOperator(token) .flatMap( - operator -> - operator.headers(headers -> addHeaders(headers, requestPayload)) - .get() - .uri( - queryTransformer(requestPayload) - .andThen(uriTransformer)) - .response() - .get()); + operator -> { + return operator.headers(headers -> addHeaders(headers, requestPayload)) + .get() + .uri(queryTransformer(requestPayload).andThen(uriTransformer)) + .response() + .get(); + }); } protected final Mono get( Object requestPayload, Function uriTransformer, Consumer headersTransformer) { - return createOperator() + return UaaThrottler.getInstance() + .acquire( + queryTransformer(requestPayload) + .andThen(uriTransformer) + .apply(UriComponentsBuilder.fromPath("")) + .build() + .toUriString()) + .flatMap(token -> get(requestPayload, uriTransformer, headersTransformer, token)); + } + + private Mono get( + Object requestPayload, + Function uriTransformer, + Consumer headersTransformer, + ResourceToken token) { + return createOperator(token) .flatMap( - operator -> - operator.headers( - headers -> - addHeaders( - headers, - requestPayload, - headersTransformer)) - .get() - .uri( - queryTransformer(requestPayload) - .andThen(uriTransformer)) - .response() - .get()); + operator -> { + return operator.headers( + headers -> + addHeaders( + headers, + requestPayload, + headersTransformer)) + .get() + .uri(queryTransformer(requestPayload).andThen(uriTransformer)) + .response() + .get(); + }); } protected final Mono get( @@ -106,38 +159,74 @@ protected final Mono get( Function uriTransformer, Consumer headersTransformer, Function> headersWhenTransformer) { - return createOperator() + return UaaThrottler.getInstance() + .acquire( + queryTransformer(requestPayload) + .andThen(uriTransformer) + .apply(UriComponentsBuilder.fromPath("")) + .build() + .toUriString()) .flatMap( - operator -> - operator.headers( - headers -> - addHeaders( - headers, - requestPayload, - headersTransformer)) - .headersWhen(headersWhenTransformer) - .get() - .uri( - queryTransformer(requestPayload) - .andThen(uriTransformer)) - .response() - .get()); + token -> + get( + requestPayload, + uriTransformer, + headersTransformer, + headersWhenTransformer, + token)); + } + + private Mono get( + Object requestPayload, + Function uriTransformer, + Consumer headersTransformer, + Function> headersWhenTransformer, + ResourceToken token) { + return createOperator(token) + .flatMap( + operator -> { + return operator.headers( + headers -> + addHeaders( + headers, + requestPayload, + headersTransformer)) + .headersWhen(headersWhenTransformer) + .get() + .uri(queryTransformer(requestPayload).andThen(uriTransformer)) + .response() + .get(); + }); } protected final Mono get( Object requestPayload, Class responseType, Function uriTransformer) { - return createOperator() + return UaaThrottler.getInstance() + .acquire( + queryTransformer(requestPayload) + .andThen(uriTransformer) + .apply(UriComponentsBuilder.fromPath("")) + .build() + .toUriString()) + .flatMap(token -> get(requestPayload, responseType, uriTransformer, token)); + } + + private Mono get( + Object requestPayload, + Class responseType, + Function uriTransformer, + ResourceToken token) { + return createOperator(token) .flatMap( - operator -> - operator.headers(headers -> addHeaders(headers, requestPayload)) - .get() - .uri( - queryTransformer(requestPayload) - .andThen(uriTransformer)) - .response() - .parseBody(responseType)); + operator -> { + return operator.headers(headers -> addHeaders(headers, requestPayload)) + .get() + .uri(queryTransformer(requestPayload).andThen(uriTransformer)) + .response() + .parseBody(responseType); + }); } protected final Mono get( @@ -145,38 +234,74 @@ protected final Mono get( Class responseType, Function uriTransformer, Consumer headersTransformer) { - return createOperator() + return UaaThrottler.getInstance() + .acquire( + queryTransformer(requestPayload) + .andThen(uriTransformer) + .apply(UriComponentsBuilder.fromPath("")) + .build() + .toUriString()) + .flatMap( + token -> + get( + requestPayload, + responseType, + uriTransformer, + headersTransformer, + token)); + } + + private Mono get( + Object requestPayload, + Class responseType, + Function uriTransformer, + Consumer headersTransformer, + ResourceToken token) { + return createOperator(token) .flatMap( - operator -> - operator.headers( - headers -> - addHeaders( - headers, - requestPayload, - headersTransformer)) - .get() - .uri( - queryTransformer(requestPayload) - .andThen(uriTransformer)) - .response() - .parseBody(responseType)); + operator -> { + return operator.headers( + headers -> + addHeaders( + headers, + requestPayload, + headersTransformer)) + .get() + .uri(queryTransformer(requestPayload).andThen(uriTransformer)) + .response() + .parseBody(responseType); + }); } protected final Mono patch( Object requestPayload, Class responseType, Function uriTransformer) { - return createOperator() + return UaaThrottler.getInstance() + .acquire( + queryTransformer(requestPayload) + .andThen(uriTransformer) + .apply(UriComponentsBuilder.fromPath("")) + .build() + .toUriString()) + .flatMap(token -> patch(requestPayload, responseType, uriTransformer, token)); + } + + private Mono patch( + Object requestPayload, + Class responseType, + Function uriTransformer, + ResourceToken token) { + return createOperator(token) .flatMap( - operator -> - operator.headers(headers -> addHeaders(headers, requestPayload)) - .patch() - .uri( - queryTransformer(requestPayload) - .andThen(uriTransformer)) - .send(requestPayload) - .response() - .parseBody(responseType)); + operator -> { + return operator.headers(headers -> addHeaders(headers, requestPayload)) + .patch() + .uri(queryTransformer(requestPayload).andThen(uriTransformer)) + .send(requestPayload) + .response() + .parseBody(responseType); + }); } protected final Mono post( @@ -184,22 +309,44 @@ protected final Mono post( Class responseType, Function uriTransformer, Consumer headersTransformer) { - return createOperator() + return UaaThrottler.getInstance() + .acquire( + queryTransformer(requestPayload) + .andThen(uriTransformer) + .apply(UriComponentsBuilder.fromPath("")) + .build() + .toUriString()) + .flatMap( + token -> + post( + requestPayload, + responseType, + uriTransformer, + headersTransformer, + token)); + } + + private Mono post( + Object requestPayload, + Class responseType, + Function uriTransformer, + Consumer headersTransformer, + ResourceToken token) { + return createOperator(token) .flatMap( - operator -> - operator.headers( - headers -> - addHeaders( - headers, - requestPayload, - headersTransformer)) - .post() - .uri( - queryTransformer(requestPayload) - .andThen(uriTransformer)) - .send(requestPayload) - .response() - .parseBody(responseType)); + operator -> { + return operator.headers( + headers -> + addHeaders( + headers, + requestPayload, + headersTransformer)) + .post() + .uri(queryTransformer(requestPayload).andThen(uriTransformer)) + .send(requestPayload) + .response() + .parseBody(responseType); + }); } protected final Mono post( @@ -208,57 +355,109 @@ protected final Mono post( Function uriTransformer, Consumer headersTransformer, Function> headersWhenTransformer) { - return createOperator() + return UaaThrottler.getInstance() + .acquire( + queryTransformer(requestPayload) + .andThen(uriTransformer) + .apply(UriComponentsBuilder.fromPath("")) + .build() + .toUriString()) .flatMap( - operator -> - operator.headers( - headers -> - addHeaders( - headers, - requestPayload, - headersTransformer)) - .headersWhen(headersWhenTransformer) - .post() - .uri( - queryTransformer(requestPayload) - .andThen(uriTransformer)) - .send(requestPayload) - .response() - .parseBody(responseType)); + token -> + post( + requestPayload, + responseType, + uriTransformer, + headersTransformer, + headersWhenTransformer, + token)); + } + + private Mono post( + Object requestPayload, + Class responseType, + Function uriTransformer, + Consumer headersTransformer, + Function> headersWhenTransformer, + ResourceToken token) { + return createOperator(token) + .flatMap( + operator -> { + return operator.headers( + headers -> + addHeaders( + headers, + requestPayload, + headersTransformer)) + .headersWhen(headersWhenTransformer) + .post() + .uri(queryTransformer(requestPayload).andThen(uriTransformer)) + .send(requestPayload) + .response() + .parseBody(responseType); + }); } protected final Mono post( Object requestPayload, Class responseType, Function uriTransformer) { - return createOperator() + return UaaThrottler.getInstance() + .acquire( + queryTransformer(requestPayload) + .andThen(uriTransformer) + .apply(UriComponentsBuilder.fromPath("")) + .build() + .toUriString()) + .flatMap(token -> post(requestPayload, responseType, uriTransformer, token)); + } + + private Mono post( + Object requestPayload, + Class responseType, + Function uriTransformer, + ResourceToken token) { + return createOperator(token) .flatMap( - operator -> - operator.headers(headers -> addHeaders(headers, requestPayload)) - .post() - .uri( - queryTransformer(requestPayload) - .andThen(uriTransformer)) - .send(requestPayload) - .response() - .parseBody(responseType)); + operator -> { + return operator.headers(headers -> addHeaders(headers, requestPayload)) + .post() + .uri(queryTransformer(requestPayload).andThen(uriTransformer)) + .send(requestPayload) + .response() + .parseBody(responseType); + }); } protected final Mono put( Object requestPayload, Class responseType, Function uriTransformer) { - return createOperator() + return UaaThrottler.getInstance() + .acquire( + queryTransformer(requestPayload) + .andThen(uriTransformer) + .apply(UriComponentsBuilder.fromPath("")) + .build() + .toUriString()) + .flatMap(token -> put(requestPayload, responseType, uriTransformer, token)); + } + + private Mono put( + Object requestPayload, + Class responseType, + Function uriTransformer, + ResourceToken token) { + return createOperator(token) .flatMap( - operator -> - operator.headers(headers -> addHeaders(headers, requestPayload)) - .put() - .uri( - queryTransformer(requestPayload) - .andThen(uriTransformer)) - .send(requestPayload) - .response() - .parseBody(responseType)); + operator -> { + return operator.headers(headers -> addHeaders(headers, requestPayload)) + .put() + .uri(queryTransformer(requestPayload).andThen(uriTransformer)) + .send(requestPayload) + .response() + .parseBody(responseType); + }); } private static void addHeaders( @@ -274,9 +473,14 @@ private static void addHeaders(HttpHeaders httpHeaders, Object requestPayload) { VersionBuilder.augment(httpHeaders, requestPayload); } - private Operator attachErrorPayloadMapper(Operator operator) { - return operator.withErrorPayloadMapper( - ErrorPayloadMappers.uaa(this.connectionContext.getObjectMapper())); + private UaaOperator attachErrorPayloadMapper(Operator operator) { + if (operator instanceof UaaOperator) { + UaaOperator op = (UaaOperator) operator; + return op.withErrorPayloadMapper( + ErrorPayloadMappers.uaa(this.connectionContext.getObjectMapper())); + } else { + throw new RuntimeException("Wrong class of operator " + operator.getClass().toString()); + } } private Function queryTransformer( diff --git a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/uaa/ReactorRatelimit.java b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/uaa/ReactorRatelimit.java new file mode 100644 index 00000000000..c350367d611 --- /dev/null +++ b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/uaa/ReactorRatelimit.java @@ -0,0 +1,53 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.cloudfoundry.reactor.uaa; + +import java.util.Map; +import org.cloudfoundry.reactor.ConnectionContext; +import org.cloudfoundry.reactor.TokenProvider; +import org.cloudfoundry.uaa.ratelimit.Ratelimit; +import org.cloudfoundry.uaa.ratelimit.RatelimitRequest; +import org.cloudfoundry.uaa.ratelimit.RatelimitResponse; +import reactor.core.publisher.Mono; + +public final class ReactorRatelimit extends AbstractUaaOperations implements Ratelimit { + + /** + * Creates an instance + * + * @param connectionContext the {@link ConnectionContext} to use when communicating with the server + * @param root the root URI of the server. Typically something like {@code https://uaa.run.pivotal.io}. + * @param tokenProvider the {@link TokenProvider} to use when communicating with the server + * @param requestTags map with custom http headers which will be added to web request + */ + public ReactorRatelimit( + ConnectionContext connectionContext, + Mono root, + TokenProvider tokenProvider, + Map requestTags) { + super(connectionContext, root, tokenProvider, requestTags); + } + + @Override + public Mono getRatelimit(RatelimitRequest request) { + return get( + request, + RatelimitResponse.class, + builder -> builder.pathSegment("RateLimitingStatus")) + .checkpoint(); + } +} diff --git a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/uaa/UaaOperator.java b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/uaa/UaaOperator.java new file mode 100644 index 00000000000..2c32bcdb0ca --- /dev/null +++ b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/uaa/UaaOperator.java @@ -0,0 +1,130 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.cloudfoundry.reactor.uaa; + +import io.netty.handler.codec.http.HttpHeaders; +import java.util.function.Consumer; +import java.util.function.Function; +import org.cloudfoundry.reactor.uaa.UaaThrottler.ResourceToken; +import org.cloudfoundry.reactor.util.ErrorPayloadMapper; +import org.cloudfoundry.reactor.util.Operator; +import org.cloudfoundry.reactor.util.OperatorContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Mono; +import reactor.netty.http.client.HttpClient; + +public class UaaOperator extends Operator { + + private ResourceToken token = null; + private static final Logger LOGGER = LoggerFactory.getLogger("cloudfoundry-client.test"); + + public UaaOperator( + OperatorContext context, HttpClient httpClient, ResourceToken value, String caller) { + super(context, httpClient); + token = value; + if (token != UaaThrottler.NON_UAA_TOKEN) { + LOGGER.debug("UaaOperator creating instance for " + value.id()); + } + } + + @Override + public UaaOperator followRedirects() { + UaaOperator result = + new UaaOperator( + this.context, + super.getHttpClient().followRedirect(true), + this.token, + "follow"); + if (this.token != null) { + result.setToken(token); + } + return result; + } + + @Override + public UaaOperator headers(Consumer headersTransformer) { + UaaOperator result = + new UaaOperator( + this.context, + super.getHttpClient().headers(headersTransformer), + this.token, + "headers"); + if (this.token != null) { + result.setToken(token); + } + return result; + } + + @Override + public UaaOperator headersWhen( + Function> headersWhenTransformer) { + UaaOperator result = + new UaaOperator( + this.context, + super.getHttpClient().headersWhen(headersWhenTransformer), + this.token, + "headersWhen"); + if (this.token != null) { + result.setToken(token); + } + return result; + } + + @Override + public UaaOperator withErrorPayloadMapper(ErrorPayloadMapper errorPayloadMapper) { + UaaOperator result = + new UaaOperator( + this.context.withErrorPayloadMapper(errorPayloadMapper), + super.getHttpClient(), + this.token, + "errorPayload"); + if (this.token != null) { + result.setToken(token); + } + return result; + } + + @Override + protected HttpClient attachRequestLogger(HttpClient httpClient) { + return super.attachRequestLogger(httpClient) + .doAfterResponseSuccess((response, connection) -> releaseToken()) + .doOnResponseError((response, connection) -> releaseToken()); + } + + public UaaOperator setToken(ResourceToken value) { + if (this.token != null) { + if (this.token == value) { + // not needed, no harm is done. + } else { + LOGGER.error( + "UaaOperator replacing token with different value. Old: " + + this.token + + " new: " + + value); + } + } + this.token = value; + return this; + } + + private void releaseToken() { + if (this.token != null) { + token.release(); + } + } +} diff --git a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/uaa/UaaThrottler.java b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/uaa/UaaThrottler.java new file mode 100644 index 00000000000..911c0c6facf --- /dev/null +++ b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/uaa/UaaThrottler.java @@ -0,0 +1,261 @@ +package org.cloudfoundry.reactor.uaa; + +import java.time.Duration; +import java.util.Objects; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; + +/** + * Throttle calls to uaa in order to avoid running into a rate limit. + * If your UAA server is configured with a rate limit, the number of allowed parallel requests + * must be set here in order to slow down the client and avoid http 429-responses. + * + * @author D034003 + * + */ +public class UaaThrottler { + public static final ResourceToken NON_UAA_TOKEN = ResourceToken.empty(); + private static UaaThrottler instance = null; + private static int maxDelay = 8; + private static int maxResources = 0; + private AtomicInteger inUse; + private Queue waitingQueue; + + private static final Logger LOGGER = LoggerFactory.getLogger("cloudfoundry-client.test"); + + private static Consumer timeoutHandler = ex -> ex.printStackTrace(); + + public static UaaThrottler getInstance() { + if (instance == null) { + instance = new UaaThrottler(); + } + return instance; + } + + private UaaThrottler() { + inUse = new AtomicInteger(0); + waitingQueue = new ConcurrentLinkedQueue<>(); + } + + /** + * This method should only be used to get a defined state in test coding. + */ + public static void reset() { + instance = null; + } + + private String logString() { + return "maxResources " + + maxResources + + " inUse " + + inUse.get() + + " queue " + + waitingQueue.size(); + } + + /** + * Here the number of parallel requests can be configured. Must be less than the value returned by + * the `RateLimitingStatus` endpoint of your UAA. + * Default is `0`, meaning no limit. + * @param rateLimit + */ + public static void setUaaRateLimit(int rateLimit) { + maxResources = rateLimit; + } + + /** + * Configure the time in seconds until a request is regarded to be lost. After this time, requests will not be canceled or aborted, but they will not count to the limit and new requests may start. + * + * @param max time in seconds. Default is 8 seconds. + */ + public static void setMaxDelay(int max) { + maxDelay = max; + } + + public Mono acquire(String url) { + if ((maxResources > 0)) { + LOGGER.debug( + "UaaThrottler about to acquire one token " + + this.logString() + + " for url " + + url); + return Mono.defer( + () -> { + if (inUse.incrementAndGet() < maxResources) { + // Slot available + ResourceToken token = + new ResourceToken( + url, + timeoutHandler, + new TimeoutException( + "Requests block each other and time" + + " out")); + LOGGER.debug( + "UaaThrottler created one token " + + this.logString() + + " for url " + + url + + " token " + + token); + return Mono.just(token); + } else { + Sinks.One sink = Sinks.one(); + waitingQueue.offer(new SinkWithId(url, sink)); + LOGGER.debug( + "UaaThrottler created sink " + + this.logString() + + " for url " + + url); + return sink.asMono(); + } + }) + .cache(); + } else { + return Mono.just(NON_UAA_TOKEN); + } + } + + private void release(ResourceToken token) { + String url = token.id; + if ((maxResources > 0)) { + LOGGER.debug( + "UaaThrottler about to release token " + + this.logString() + + " released " + + token.released.get() + + " for url " + + url + + " token " + + token); + SinkWithId urlSink = waitingQueue.poll(); + if (urlSink != null) { + Mono.delay(Duration.ofMillis(1)) + .subscribe( + __ -> { + ResourceToken newToken = + new ResourceToken( + urlSink.id, + timeoutHandler, + new TimeoutException( + "Requests block each other and time" + + " out")); + LOGGER.debug( + "UaaThrottler releasing sink and creating token " + + this.logString() + + " released " + + token.released.get() + + " oldUrl " + + token.id + + " newUrl " + + newToken.id + + " oldToken " + + token + + " newToken " + + newToken); + inUse.decrementAndGet(); + urlSink.sink.tryEmitValue(newToken); + }); + } else { + inUse.decrementAndGet(); + LOGGER.debug( + "UaThrottler completed release. " + + this.logString() + + " for url " + + url + + " token" + + token); + } + } + } + + /** + * By default, a message and callstack is written to stderr in case a request does not return within maxDelay seconds. + * Here different behavior can be configured. + * + * @param timeoutHandler + */ + public static void setTimeoutHandler(Consumer timeoutHandler) { + UaaThrottler.timeoutHandler = Objects.requireNonNull(timeoutHandler); + } + + public static class ResourceToken { + private final String id; + private final ScheduledFuture leakTask; + private final AtomicBoolean released = new AtomicBoolean(false); + private UaaThrottler instance; + + private ResourceToken( + String id, + Consumer timeoutHandler, + TimeoutException tokenAquiredAt) { + this.id = id; + this.instance = UaaThrottler.instance; + if (timeoutHandler == null) { // this is the null-object + this.leakTask = null; + } else { + this.leakTask = + Executors.newSingleThreadScheduledExecutor() + .schedule( + () -> { + if (!released.get()) { + LOGGER.error( + "[LEAK DETECTED] UaaThrottler.ResourceToken" + + " was not released within " + + maxDelay + + " seconds " + + this.id + + " token " + + this); + this.release(); + timeoutHandler.accept(tokenAquiredAt); + } + }, + maxDelay, + TimeUnit.SECONDS); + } + } + + private static ResourceToken empty() { + return new ResourceToken(null, null, null); + } + + public void release() { + if (this != NON_UAA_TOKEN) { + if (released.compareAndSet(false, true)) { + leakTask.cancel(true); + Mono.delay(Duration.ofMillis(1)) + .doOnNext(__ -> this.instance.release(this)) + .subscribe(); + } else { + // If a request times out, the token gets released. When the response comes + // later, the token is already released and must not be released a second time. + } + } + } + + public String id() { + return id; + } + } + + private class SinkWithId { + final String id; + final Sinks.One sink; + + private SinkWithId(String idForLogmessages, Sinks.One sink) { + this.id = idForLogmessages; + this.sink = sink; + } + } +} diff --git a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/uaa/_ReactorUaaClient.java b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/uaa/_ReactorUaaClient.java index 1b2d613c4e7..a1b28ca8489 100644 --- a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/uaa/_ReactorUaaClient.java +++ b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/uaa/_ReactorUaaClient.java @@ -33,6 +33,7 @@ import org.cloudfoundry.uaa.groups.Groups; import org.cloudfoundry.uaa.identityproviders.IdentityProviders; import org.cloudfoundry.uaa.identityzones.IdentityZones; +import org.cloudfoundry.uaa.ratelimit.Ratelimit; import org.cloudfoundry.uaa.serverinformation.ServerInformation; import org.cloudfoundry.uaa.tokens.Tokens; import org.cloudfoundry.uaa.users.Users; @@ -104,6 +105,12 @@ public Users users() { return new ReactorUsers(getConnectionContext(), getRoot(), getTokenProvider(), getRequestTags()); } + @Override + @Value.Derived + public Ratelimit rateLimit() { + return new ReactorRatelimit(getConnectionContext(), getRoot(), getTokenProvider(), getRequestTags()); + } + /** * The connection context */ diff --git a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/uaa/_UaaRatelimit.java b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/uaa/_UaaRatelimit.java new file mode 100644 index 00000000000..054620e752c --- /dev/null +++ b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/uaa/_UaaRatelimit.java @@ -0,0 +1,17 @@ +package org.cloudfoundry.reactor.uaa; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; + +import org.cloudfoundry.Nullable; +import org.immutables.value.Value; + +@JsonDeserialize +@Value.Immutable +abstract class _UaaRatelimit { + + @JsonProperty("limiterMappings") + @Nullable + public abstract Integer getRatelimit(); + + +} diff --git a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/util/AbstractReactorOperations.java b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/util/AbstractReactorOperations.java index ec97861f60c..f80486807c8 100644 --- a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/util/AbstractReactorOperations.java +++ b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/util/AbstractReactorOperations.java @@ -58,19 +58,19 @@ protected Mono createOperator() { .map(operator -> operator.headersWhen(this::addHeadersWhen)); } - private void addHeaders(HttpHeaders httpHeaders) { + public void addHeaders(HttpHeaders httpHeaders) { UserAgent.setUserAgent(httpHeaders); JsonCodec.setDecodeHeaders(httpHeaders); this.requestTags.forEach(httpHeaders::set); } - private Mono addHeadersWhen(HttpHeaders httpHeaders) { + public Mono addHeadersWhen(HttpHeaders httpHeaders) { return this.tokenProvider .getToken(this.connectionContext) .map(token -> httpHeaders.set(AUTHORIZATION, token)); } - private OperatorContext buildOperatorContext(String root) { + public OperatorContext buildOperatorContext(String root) { return OperatorContext.builder() .connectionContext(this.connectionContext) .root(root) diff --git a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/util/Operator.java b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/util/Operator.java index bc069123269..ba7ae430529 100644 --- a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/util/Operator.java +++ b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/util/Operator.java @@ -61,6 +61,10 @@ public UriConfiguration delete() { return request(HttpMethod.DELETE); } + protected HttpClient getHttpClient() { + return this.httpClient; + } + public Operator followRedirects() { return new Operator(this.context, this.httpClient.followRedirect(true)); } @@ -104,7 +108,7 @@ public Operator withErrorPayloadMapper(ErrorPayloadMapper errorPayloadMapper) { this.context.withErrorPayloadMapper(errorPayloadMapper), this.httpClient); } - private static HttpClient attachRequestLogger(HttpClient httpClient) { + protected HttpClient attachRequestLogger(HttpClient httpClient) { RequestLogger requestLogger = new RequestLogger(); return httpClient .doAfterRequest((request, connection) -> requestLogger.request(request)) @@ -129,18 +133,30 @@ public ResponseReceiverConstructor send(Object payload) { return send(serialized(payload)); } + private BiFunction> + wrapRequestTransformer( + BiFunction> original) { + return (req, out) -> Mono.from(original.apply(req, out)); + } + public ResponseReceiverConstructor send( BiFunction> requestTransformer) { - HttpClient.ResponseReceiver responseReceiver = - this.requestSender.send(requestTransformer); - return new ResponseReceiverConstructor(this.context, responseReceiver); + return new ResponseReceiverConstructor( + this.context, + this.requestSender.send(wrapRequestTransformer(requestTransformer))); + } + + BiConsumer wrapFormTransformer( + BiConsumer original) { + + return (req, form) -> original.accept(req, form); } public ResponseReceiverConstructor sendForm( BiConsumer requestTransformer) { - HttpClient.ResponseReceiver responseReceiver = - this.requestSender.sendForm(requestTransformer); - return new ResponseReceiverConstructor(this.context, responseReceiver); + return new ResponseReceiverConstructor( + this.context, + this.requestSender.sendForm(wrapFormTransformer(requestTransformer))); } private BiFunction> serialized( diff --git a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/util/RequestLogger.java b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/util/RequestLogger.java index d0f3cc5ae3a..0acf846658c 100644 --- a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/util/RequestLogger.java +++ b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/util/RequestLogger.java @@ -34,7 +34,7 @@ public class RequestLogger { private long requestSentTime; public void request(HttpClientRequest request) { - request(String.format("%-6s {}", request.method()), request.uri()); + request(String.format("%-6s {}", request.method()), request.resourceUrl()); } public void response(HttpClientResponse response) { @@ -50,19 +50,19 @@ public void response(HttpClientResponse response) { RESPONSE_LOGGER.debug( "{} {} ({}, {})", response.status().code(), - response.uri(), + response.resourceUrl(), elapsed, response.responseHeaders().get("X-Vcap-Request-Id")); } else { RESPONSE_LOGGER.debug( - "{} {} ({})", response.status().code(), response.uri(), elapsed); + "{} {} ({})", response.status().code(), response.resourceUrl(), elapsed); } } else { if (RESPONSE_LOGGER.isTraceEnabled()) { RESPONSE_LOGGER.warn( "{} {} ({}, {}) [{}]", response.status().code(), - response.uri(), + response.resourceUrl(), elapsed, response.responseHeaders().get("X-Vcap-Request-Id"), String.join(", ", warnings)); @@ -70,7 +70,7 @@ public void response(HttpClientResponse response) { RESPONSE_LOGGER.warn( "{} {} ({}) [{}]", response.status().code(), - response.uri(), + response.resourceUrl(), elapsed, String.join(", ", warnings)); } diff --git a/cloudfoundry-client-reactor/src/test/java/org/cloudfoundry/reactor/AbstractRestTest.java b/cloudfoundry-client-reactor/src/test/java/org/cloudfoundry/reactor/AbstractRestTest.java index f3f3ad3e1af..fbb1f4d29f3 100644 --- a/cloudfoundry-client-reactor/src/test/java/org/cloudfoundry/reactor/AbstractRestTest.java +++ b/cloudfoundry-client-reactor/src/test/java/org/cloudfoundry/reactor/AbstractRestTest.java @@ -53,7 +53,7 @@ public abstract class AbstractRestTest { protected final Mono root; - final MockWebServer mockWebServer; + protected final MockWebServer mockWebServer; private MultipleRequestDispatcher multipleRequestDispatcher = new MultipleRequestDispatcher(); @@ -78,7 +78,7 @@ protected final void mockRequest(InteractionContext interactionContext) { this.multipleRequestDispatcher.add(interactionContext); } - private static final class FailingDeserializationProblemHandler + public static final class FailingDeserializationProblemHandler extends DeserializationProblemHandler { @Override diff --git a/cloudfoundry-client-reactor/src/test/java/org/cloudfoundry/reactor/uaa/groups/ReactorGroupsTest.java b/cloudfoundry-client-reactor/src/test/java/org/cloudfoundry/reactor/uaa/groups/ReactorGroupsTest.java index 2275afcccde..b12c3aeea8a 100644 --- a/cloudfoundry-client-reactor/src/test/java/org/cloudfoundry/reactor/uaa/groups/ReactorGroupsTest.java +++ b/cloudfoundry-client-reactor/src/test/java/org/cloudfoundry/reactor/uaa/groups/ReactorGroupsTest.java @@ -22,14 +22,21 @@ import static io.netty.handler.codec.http.HttpMethod.PUT; import static io.netty.handler.codec.http.HttpResponseStatus.CREATED; import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static io.netty.handler.codec.http.HttpResponseStatus.TOO_MANY_REQUESTS; import static org.cloudfoundry.uaa.SortOrder.ASCENDING; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import java.time.Duration; import java.util.Collections; +import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; import org.cloudfoundry.reactor.InteractionContext; import org.cloudfoundry.reactor.TestRequest; import org.cloudfoundry.reactor.TestResponse; import org.cloudfoundry.reactor.uaa.AbstractUaaApiTest; +import org.cloudfoundry.reactor.uaa.UaaThrottler; import org.cloudfoundry.uaa.Metadata; import org.cloudfoundry.uaa.groups.AddMemberRequest; import org.cloudfoundry.uaa.groups.AddMemberResponse; @@ -66,14 +73,22 @@ import org.cloudfoundry.uaa.users.Email; import org.cloudfoundry.uaa.users.Meta; import org.cloudfoundry.uaa.users.Name; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import reactor.test.StepVerifier; final class ReactorGroupsTest extends AbstractUaaApiTest { private final ReactorGroups groups = new ReactorGroups( - CONNECTION_CONTEXT, this.root, TOKEN_PROVIDER, Collections.emptyMap()); + CONNECTION_CONTEXT, super.root, TOKEN_PROVIDER, Collections.emptyMap()); + + @BeforeEach + void clean() { + UaaThrottler.reset(); + } @Test void addMember() { @@ -290,6 +305,128 @@ void get() { .verify(Duration.ofSeconds(5)); } + @Test + void exhaustUaaThrottlerTest() { + @SuppressWarnings("unchecked") + Consumer mockHandler = mock(Consumer.class); + GetGroupResponse resp1Body = + GetGroupResponse.builder() + .id("test-group-id") + .metadata( + Metadata.builder() + .created("2016-06-03T17:59:30.527Z") + .lastModified("2016-06-03T17:59:30.561Z") + .version(1) + .build()) + .description("the cool group") + .displayName("Cooler Group Name for Retrieve") + .member( + MemberSummary.builder() + .origin("uaa") + .type(MemberType.USER) + .memberId("f0e6a061-6e3a-4be9-ace5-142ee24e20b7") + .build()) + .schema("urn:scim:schemas:core:1.0") + .zoneId("uaa") + .build(); + mockRequest( + InteractionContext.builder() + .request( + TestRequest.builder() + .method(GET) + .path("/Groups/test-group-id1") + .build()) + .response( + TestResponse.builder() + .status(OK) + .payload("fixtures/uaa/groups/GET_{id}_response.json") + .build()) + .build()); + mockRequest( + InteractionContext.builder() + .request( + TestRequest.builder() + .method(GET) + .path("/Groups/test-group-id2") + .build()) + .response( + TestResponse.builder() + .status(OK) + .payload("fixtures/uaa/groups/GET_{id}_response.json") + .build()) + .build()); + mockRequest( + InteractionContext.builder() + .request( + TestRequest.builder() + .method(GET) + .path("/Groups/test-group-id3") + .build()) + .response( + TestResponse.builder() + .status(OK) + .payload("fixtures/uaa/groups/GET_{id}_response.json") + .build()) + .build()); + + UaaThrottler.setUaaRateLimit(2); + UaaThrottler.setMaxDelay(6); + UaaThrottler.setTimeoutHandler(mockHandler); + UaaThrottler.getInstance(); + + Mono resp1 = + groups.get(GetGroupRequest.builder().groupId("test-group-id1").build()) + .delayElement(Duration.ofMillis(200)) + .doOnSubscribe(s -> System.out.println("Subscribing resp1")) + .doOnNext(v -> System.out.println("resp1 emits: " + v)); + Mono resp2 = + groups.get(GetGroupRequest.builder().groupId("test-group-id2").build()) + .delayElement(Duration.ofMillis(100)) + .doOnSubscribe(s -> System.out.println("Subscribing resp2")) + .doOnNext(v -> System.out.println("resp2 emits: " + v)); + Mono resp3 = + groups.get(GetGroupRequest.builder().groupId("test-group-id3").build()) + .delayElement(Duration.ofMillis(400)) + .doOnSubscribe(s -> System.out.println("Subscribing resp3")) + .doOnNext(v -> System.out.println("resp3 emits: " + v)); + + Flux merged = Flux.concat(resp1, resp2, resp3); + + StepVerifier.create(merged) + .expectNext(resp1Body) // 100ms + .expectNext(resp1Body) // 200ms + .expectNext(resp1Body) // 400ms + .verifyComplete(); + org.mockito.Mockito.verify(mockHandler, never()).accept(any(TimeoutException.class)); + } + + @Test + void getFailingWithRateLimit() { + mockRequest( + InteractionContext.builder() + .request( + TestRequest.builder() + .method(GET) + .path("/Groups/test-group-id") + .build()) + .response( + TestResponse.builder() + .status(TOO_MANY_REQUESTS) + .payload("fixtures/uaa/groups/GET_429_response.json") + .build()) + .build()); + + UaaThrottler.setUaaRateLimit(2); + UaaThrottler.setMaxDelay(6); + this.groups + .get(GetGroupRequest.builder().groupId("test-group-id").build()) + .as(StepVerifier::create) + .expectErrorMessage( + "429 - Too Many Request - Request limited by Rate Limiter configuration:" + + " Name: SCIM Type: CredentialsID: null") + .verify(Duration.ofSeconds(60)); + } + @Test void list() { mockRequest( diff --git a/cloudfoundry-client-reactor/src/test/resources/fixtures/uaa/groups/GET_429_response.json b/cloudfoundry-client-reactor/src/test/resources/fixtures/uaa/groups/GET_429_response.json new file mode 100644 index 00000000000..798e7c4a5b6 --- /dev/null +++ b/cloudfoundry-client-reactor/src/test/resources/fixtures/uaa/groups/GET_429_response.json @@ -0,0 +1,3 @@ +{ + "error":"429 - Too Many Request - Request limited by Rate Limiter configuration: Name: SCIM Type: CredentialsID" +} diff --git a/cloudfoundry-client/src/main/java/org/cloudfoundry/uaa/UaaClient.java b/cloudfoundry-client/src/main/java/org/cloudfoundry/uaa/UaaClient.java index 9c88d37f592..665fb920ee1 100644 --- a/cloudfoundry-client/src/main/java/org/cloudfoundry/uaa/UaaClient.java +++ b/cloudfoundry-client/src/main/java/org/cloudfoundry/uaa/UaaClient.java @@ -21,6 +21,7 @@ import org.cloudfoundry.uaa.groups.Groups; import org.cloudfoundry.uaa.identityproviders.IdentityProviders; import org.cloudfoundry.uaa.identityzones.IdentityZones; +import org.cloudfoundry.uaa.ratelimit.Ratelimit; import org.cloudfoundry.uaa.serverinformation.ServerInformation; import org.cloudfoundry.uaa.tokens.Tokens; import org.cloudfoundry.uaa.users.Users; @@ -80,4 +81,9 @@ public interface UaaClient { * Main entry point to the UAA User Client API */ Users users(); + + /** + * Main entry point to the UAA Ratelimit API + */ + Ratelimit rateLimit(); } diff --git a/cloudfoundry-client/src/main/java/org/cloudfoundry/uaa/ratelimit/Ratelimit.java b/cloudfoundry-client/src/main/java/org/cloudfoundry/uaa/ratelimit/Ratelimit.java new file mode 100644 index 00000000000..c277fa7331d --- /dev/null +++ b/cloudfoundry-client/src/main/java/org/cloudfoundry/uaa/ratelimit/Ratelimit.java @@ -0,0 +1,27 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.cloudfoundry.uaa.ratelimit; + +import reactor.core.publisher.Mono; + +/** + * Main entry point to the UAA Ratelimit Client API + */ +public interface Ratelimit { + + Mono getRatelimit(RatelimitRequest request); +} diff --git a/cloudfoundry-client/src/main/java/org/cloudfoundry/uaa/ratelimit/_Current.java b/cloudfoundry-client/src/main/java/org/cloudfoundry/uaa/ratelimit/_Current.java new file mode 100644 index 00000000000..1cc7ccc0ed7 --- /dev/null +++ b/cloudfoundry-client/src/main/java/org/cloudfoundry/uaa/ratelimit/_Current.java @@ -0,0 +1,63 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.cloudfoundry.uaa.ratelimit; + + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; + +import java.util.Date; + +import org.immutables.value.Value; + +/** + * The payload for the uaa ratelimiting + */ +@JsonDeserialize +@Value.Immutable +abstract class _Current { + + /** + * The limit value + */ + @JsonProperty("limiterMappings") + abstract Integer getLimit(); + + /** + * Is ratelimit "ACTIVE" or not? Possible values are DISABLED, PENDING, ACTIVE + */ + @JsonProperty("status") + abstract String getStatus(); + + /** + * Timestamp, when this Current was created. + */ + @JsonProperty("asOf") + abstract Date getTimeOfCurrent(); + + /** + * The credentialIdExtractor + */ + @JsonProperty("credentialIdExtractor") + abstract String getCredentialIdExtractor(); + + /** + * The loggingLevel. Valid values include: "OnlyLimited", "AllCalls" and "AllCallsWithDetails" + */ + @JsonProperty("loggingLevel") + abstract String getLoggingLevel(); +} diff --git a/cloudfoundry-client/src/main/java/org/cloudfoundry/uaa/ratelimit/_RatelimitRequest.java b/cloudfoundry-client/src/main/java/org/cloudfoundry/uaa/ratelimit/_RatelimitRequest.java new file mode 100644 index 00000000000..be75d58bfa4 --- /dev/null +++ b/cloudfoundry-client/src/main/java/org/cloudfoundry/uaa/ratelimit/_RatelimitRequest.java @@ -0,0 +1,24 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.cloudfoundry.uaa.ratelimit; + +import org.immutables.value.Value; + +@Value.Immutable +abstract class _RatelimitRequest { + +} diff --git a/cloudfoundry-client/src/main/java/org/cloudfoundry/uaa/ratelimit/_RatelimitResponse.java b/cloudfoundry-client/src/main/java/org/cloudfoundry/uaa/ratelimit/_RatelimitResponse.java new file mode 100644 index 00000000000..0fe927cbf60 --- /dev/null +++ b/cloudfoundry-client/src/main/java/org/cloudfoundry/uaa/ratelimit/_RatelimitResponse.java @@ -0,0 +1,36 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.cloudfoundry.uaa.ratelimit; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import org.cloudfoundry.Nullable; +import org.immutables.value.Value; + +@JsonDeserialize +@Value.Immutable +abstract class _RatelimitResponse { + + @JsonProperty("current") + @Nullable + abstract Current getCurrentData(); + + @JsonProperty("fromSource") + @Nullable + abstract String getFromSource(); + +} diff --git a/integration-test/src/test/java/org/cloudfoundry/IntegrationTestConfiguration.java b/integration-test/src/test/java/org/cloudfoundry/IntegrationTestConfiguration.java index c0b3f44b30d..a42a71798b6 100644 --- a/integration-test/src/test/java/org/cloudfoundry/IntegrationTestConfiguration.java +++ b/integration-test/src/test/java/org/cloudfoundry/IntegrationTestConfiguration.java @@ -65,6 +65,7 @@ import org.cloudfoundry.reactor.uaa.ReactorUaaClient; import org.cloudfoundry.routing.RoutingClient; import org.cloudfoundry.uaa.UaaClient; +import org.cloudfoundry.uaa.UaaRatelimitInitializer; import org.cloudfoundry.uaa.clients.CreateClientRequest; import org.cloudfoundry.uaa.groups.AddMemberRequest; import org.cloudfoundry.uaa.groups.CreateGroupRequest; @@ -73,6 +74,7 @@ import org.cloudfoundry.uaa.groups.ListGroupsRequest; import org.cloudfoundry.uaa.groups.ListGroupsResponse; import org.cloudfoundry.uaa.groups.MemberType; +import org.cloudfoundry.uaa.ratelimit.Ratelimit; import org.cloudfoundry.uaa.users.CreateUserRequest; import org.cloudfoundry.uaa.users.CreateUserResponse; import org.cloudfoundry.uaa.users.Email; @@ -209,7 +211,9 @@ ReactorUaaClient adminUaaClient( @Bean(initMethod = "block") @DependsOn("cloudFoundryCleaner") Mono> client( - @Qualifier("admin") UaaClient uaaClient, String clientId, String clientSecret) { + @Qualifier("admin") UaaClient uaaClient, + @Qualifier("clientId") String clientId, + @Qualifier("clientSecret") String clientSecret) { return uaaClient .clients() .create( @@ -243,17 +247,19 @@ String clientSecret(NameFactory nameFactory) { } @Bean + @DependsOn("uaaRateLimitInitializer") CloudFoundryCleaner cloudFoundryCleaner( @Qualifier("admin") CloudFoundryClient cloudFoundryClient, NameFactory nameFactory, @Qualifier("admin") NetworkingClient networkingClient, - Version serverVersion, + @Qualifier("serverVersion") Version serverVersion, @Qualifier("admin") UaaClient uaaClient) { return new CloudFoundryCleaner( cloudFoundryClient, nameFactory, networkingClient, serverVersion, uaaClient); } @Bean + @Qualifier("nonAdmin") ReactorCloudFoundryClient cloudFoundryClient( ConnectionContext connectionContext, TokenProvider tokenProvider) { return ReactorCloudFoundryClient.builder() @@ -264,13 +270,13 @@ ReactorCloudFoundryClient cloudFoundryClient( @Bean DefaultCloudFoundryOperations cloudFoundryOperations( - CloudFoundryClient cloudFoundryClient, + @Qualifier("nonAdmin") CloudFoundryClient cloudFoundryClient, DopplerClient dopplerClient, - NetworkingClient networkingClient, + @Qualifier("nonAdmin") NetworkingClient networkingClient, RoutingClient routingClient, - UaaClient uaaClient, - String organizationName, - String spaceName) { + @Qualifier("nonAdmin") UaaClient uaaClient, + @Qualifier("organizationName") String organizationName, + @Qualifier("spaceName") String spaceName) { return DefaultCloudFoundryOperations.builder() .cloudFoundryClient(cloudFoundryClient) .dopplerClient(dopplerClient) @@ -315,10 +321,24 @@ DefaultConnectionContext connectionContext( connectionContext.proxyConfiguration(proxyConfiguration.build()); } - return connectionContext.build(); } + @Bean + public UaaRatelimitInitializer uaaRateLimitInitializer( + Ratelimit ratelimitService, + @Value("${uaa.api.request.limit:#{null}}") Integer commandlineRequestLimit) { + return new UaaRatelimitInitializer(ratelimitService, commandlineRequestLimit); + } + + @Bean + Ratelimit uaaRatelimit( + ConnectionContext connectionContext, + @Value("${test.admin.clientId}") String clientId, + @Value("${test.admin.clientSecret}") String clientSecret) { + return adminUaaClient(connectionContext, clientId, clientSecret).rateLimit(); + } + @Bean DopplerClient dopplerClient(ConnectionContext connectionContext, TokenProvider tokenProvider) { return ReactorDopplerClient.builder() @@ -344,7 +364,9 @@ RandomNameFactory nameFactory() { @Bean(initMethod = "block") @DependsOn("cloudFoundryCleaner") Mono metricRegistrarServiceInstance( - CloudFoundryClient cloudFoundryClient, Mono spaceId, NameFactory nameFactory) { + @Qualifier("nonAdmin") CloudFoundryClient cloudFoundryClient, + @Qualifier("spaceId") Mono spaceId, + NameFactory nameFactory) { return spaceId.flatMap( spaceIdValue -> cloudFoundryClient @@ -360,6 +382,7 @@ Mono metricRegistrarServiceInstance( } @Bean + @Qualifier("nonAdmin") NetworkingClient networkingClient( ConnectionContext connectionContext, TokenProvider tokenProvider) { return ReactorNetworkingClient.builder() @@ -371,10 +394,10 @@ NetworkingClient networkingClient( @Bean(initMethod = "block") @DependsOn("cloudFoundryCleaner") Mono organizationId( - CloudFoundryClient cloudFoundryClient, - String organizationName, - String organizationQuotaName, - Mono userId) { + @Qualifier("nonAdmin") CloudFoundryClient cloudFoundryClient, + @Qualifier("organizationName") String organizationName, + @Qualifier("organizationQuotaName") String organizationQuotaName, + @Qualifier("userId") Mono userId) { return userId.flatMap( userId1 -> cloudFoundryClient @@ -468,12 +491,12 @@ Version serverVersion(@Qualifier("admin") CloudFoundryClient cloudFoundryClient) @Bean(initMethod = "block") @DependsOn("cloudFoundryCleaner") Mono serviceBrokerId( - CloudFoundryClient cloudFoundryClient, + @Qualifier("nonAdmin") CloudFoundryClient cloudFoundryClient, NameFactory nameFactory, - String planName, - String serviceBrokerName, - String serviceName, - Mono spaceId) { + @Qualifier("planName") String planName, + @Qualifier("serviceBrokerName") String serviceBrokerName, + @Qualifier("serviceName") String serviceName, + @Qualifier("spaceId") Mono spaceId) { return spaceId.flatMap( spaceId1 -> ServiceBrokerUtils.createServiceBroker( @@ -510,7 +533,9 @@ String serviceName(NameFactory nameFactory) { @Bean(initMethod = "block") @DependsOn("cloudFoundryCleaner") Mono spaceId( - CloudFoundryClient cloudFoundryClient, Mono organizationId, String spaceName) { + @Qualifier("nonAdmin") CloudFoundryClient cloudFoundryClient, + @Qualifier("organizationId") Mono organizationId, + @Qualifier("spaceName") String spaceName) { return organizationId .flatMap( orgId -> @@ -535,7 +560,9 @@ String spaceName(NameFactory nameFactory) { @Bean(initMethod = "block") @DependsOn("cloudFoundryCleaner") - Mono stackId(CloudFoundryClient cloudFoundryClient, Mono stackName) { + Mono stackId( + @Qualifier("nonAdmin") CloudFoundryClient cloudFoundryClient, + @Qualifier("stackName") Mono stackName) { return stackName .flux() .flatMap( @@ -563,7 +590,7 @@ Mono stackId(CloudFoundryClient cloudFoundryClient, Mono stackNa */ @Bean(initMethod = "block") @DependsOn("cloudFoundryCleaner") - Mono stackName(CloudFoundryClient cloudFoundryClient) { + Mono stackName(@Qualifier("nonAdmin") CloudFoundryClient cloudFoundryClient) { return PaginationUtils.requestClientV2Resources( page -> cloudFoundryClient @@ -581,11 +608,12 @@ Mono stackName(CloudFoundryClient cloudFoundryClient) { @Bean(initMethod = "block") @DependsOn("cloudFoundryCleaner") Mono testLogCacheApp( - CloudFoundryClient cloudFoundryClient, - Mono spaceId, - Mono metricRegistrarServiceInstance, - String testLogCacheAppName, - String testLogCacheHostName, + @Qualifier("nonAdmin") CloudFoundryClient cloudFoundryClient, + @Qualifier("spaceId") Mono spaceId, + @Qualifier("metricRegistrarServiceInstance") + Mono metricRegistrarServiceInstance, + @Qualifier("testLogCacheAppName") String testLogCacheAppName, + @Qualifier("testLogCacheHostName") String testLogCacheHostName, Path testLogCacheAppbits) { return metricRegistrarServiceInstance .zipWith(spaceId) @@ -633,7 +661,10 @@ String testLogCacheHostName(NameFactory nameFactory) { @Bean @DependsOn({"client", "userId"}) PasswordGrantTokenProvider tokenProvider( - String clientId, String clientSecret, String password, String username) { + @Qualifier("clientId") String clientId, + @Qualifier("clientSecret") String clientSecret, + @Qualifier("password") String password, + @Qualifier("username") String username) { return PasswordGrantTokenProvider.builder() .clientId(clientId) .clientSecret(clientSecret) @@ -643,6 +674,7 @@ PasswordGrantTokenProvider tokenProvider( } @Bean + @Qualifier("nonAdmin") ReactorUaaClient uaaClient(ConnectionContext connectionContext, TokenProvider tokenProvider) { return ReactorUaaClient.builder() .connectionContext(connectionContext) @@ -652,7 +684,10 @@ ReactorUaaClient uaaClient(ConnectionContext connectionContext, TokenProvider to @Bean(initMethod = "block") @DependsOn("cloudFoundryCleaner") - Mono userId(@Qualifier("admin") UaaClient uaaClient, String password, String username) { + Mono userId( + @Qualifier("admin") UaaClient uaaClient, + @Qualifier("password") String password, + @Qualifier("username") String username) { return uaaClient .users() .create( diff --git a/integration-test/src/test/java/org/cloudfoundry/uaa/UaaRatelimitInitializer.java b/integration-test/src/test/java/org/cloudfoundry/uaa/UaaRatelimitInitializer.java new file mode 100644 index 00000000000..5090a1c9cd7 --- /dev/null +++ b/integration-test/src/test/java/org/cloudfoundry/uaa/UaaRatelimitInitializer.java @@ -0,0 +1,79 @@ +package org.cloudfoundry.uaa; + +import java.time.Duration; +import org.cloudfoundry.reactor.uaa.UaaThrottler; +import org.cloudfoundry.uaa.ratelimit.Current; +import org.cloudfoundry.uaa.ratelimit.Ratelimit; +import org.cloudfoundry.uaa.ratelimit.RatelimitRequest; +import org.cloudfoundry.uaa.ratelimit.RatelimitResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.stereotype.Component; +import reactor.core.publisher.Mono; + +@Component +public class UaaRatelimitInitializer implements InitializingBean { + private final Logger logger = LoggerFactory.getLogger("cloudfoundry-client.test"); + + private final Ratelimit ratelimitService; + private Integer commandlineRequestlimit; + + public UaaRatelimitInitializer(Ratelimit ratelimitService, Integer commandlineRequestLimit) { + this.ratelimitService = ratelimitService; + this.commandlineRequestlimit = commandlineRequestLimit; + } + + private void init() { + int limit = 0; + + Integer serverRatelimit = + ratelimitService + .getRatelimit(RatelimitRequest.builder().build()) + .map(response -> getServerRatelimit(response)) + .timeout(Duration.ofSeconds(5)) + .onErrorResume( + ex -> { + logger.error( + "Warning: could not fetch UAA rate limit, using default" + + " " + + 0 + + ". Cause: " + + ex); + return Mono.just(0); + }) + .block(); + + if (serverRatelimit != null) { + limit = serverRatelimit.intValue(); + } + if (commandlineRequestlimit != null) { + limit = commandlineRequestlimit.intValue(); + logger.debug("UaaRatelimitInitializer using configured value " + limit); + } + + UaaThrottler.setUaaRateLimit(limit); + } + + private Integer getServerRatelimit(RatelimitResponse response) { + Current curr = response.getCurrentData(); + if (!"ACTIVE".equals(curr.getStatus())) { + logger.debug( + "UaaRatelimitInitializer server ratelimit is not 'ACTIVE', but " + + curr.getStatus() + + ". Ignoring server value for ratelimit."); + return null; + } + Integer result = curr.getLimit() - 1; // using the value returned from server + // will not stop the 429-Errors. Decreased value is safe. + logger.debug( + "UaaRatelimitInitializer using server value for ratelimit -1, resulting in " + + result); + return result; + } + + @Override + public void afterPropertiesSet() throws Exception { + this.init(); + } +}