diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequester.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequester.java index f03bd20577..ece5c1c631 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequester.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequester.java @@ -29,10 +29,9 @@ import java.io.IOException; import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.util.List; import java.util.Set; import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; import org.apache.hc.core5.annotation.Internal; import org.apache.hc.core5.concurrent.Cancellable; @@ -44,21 +43,15 @@ import org.apache.hc.core5.function.Decorator; import org.apache.hc.core5.function.Resolver; import org.apache.hc.core5.http.ConnectionClosedException; -import org.apache.hc.core5.http.EntityDetails; -import org.apache.hc.core5.http.Header; import org.apache.hc.core5.http.HttpException; import org.apache.hc.core5.http.HttpHost; -import org.apache.hc.core5.http.HttpResponse; import org.apache.hc.core5.http.impl.DefaultAddressResolver; import org.apache.hc.core5.http.impl.bootstrap.AsyncRequester; import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler; import org.apache.hc.core5.http.nio.AsyncPushConsumer; import org.apache.hc.core5.http.nio.AsyncRequestProducer; import org.apache.hc.core5.http.nio.AsyncResponseConsumer; -import org.apache.hc.core5.http.nio.CapacityChannel; -import org.apache.hc.core5.http.nio.DataStreamChannel; import org.apache.hc.core5.http.nio.HandlerFactory; -import org.apache.hc.core5.http.nio.RequestChannel; import org.apache.hc.core5.http.nio.command.RequestExecutionCommand; import org.apache.hc.core5.http.nio.command.ShutdownCommand; import org.apache.hc.core5.http.nio.ssl.TlsStrategy; @@ -87,6 +80,12 @@ public class H2MultiplexingRequester extends AsyncRequester { private final H2ConnPool connPool; + /** + * Hard cap on per-connection pending commands (queued). + * {@code <= 0} disables the cap. + */ + private final int maxRequestsPerConnection; + /** * Use {@link H2MultiplexingRequesterBootstrap} to create instances of this class. */ @@ -100,11 +99,13 @@ public H2MultiplexingRequester( final Resolver addressResolver, final TlsStrategy tlsStrategy, final IOReactorMetricsListener threadPoolListener, - final IOWorkerSelector workerSelector) { + final IOWorkerSelector workerSelector, + final int maxRequestsPerConnection) { super(eventHandlerFactory, ioReactorConfig, ioSessionDecorator, exceptionCallback, sessionListener, ShutdownCommand.GRACEFUL_IMMEDIATE_CALLBACK, DefaultAddressResolver.INSTANCE, threadPoolListener, workerSelector); this.connPool = new H2ConnPool(this, addressResolver, tlsStrategy); + this.maxRequestsPerConnection = maxRequestsPerConnection; } public void closeIdle(final TimeValue idleTime) { @@ -182,83 +183,38 @@ private void execute( if (request.getAuthority() == null) { request.setAuthority(new URIAuthority(host)); } + if (request.getScheme() == null) { + request.setScheme(host.getSchemeName()); + } connPool.getSession(host, timeout, new FutureCallback() { @Override public void completed(final IOSession ioSession) { - final AsyncClientExchangeHandler handlerProxy = new AsyncClientExchangeHandler() { - - @Override - public void releaseResources() { - exchangeHandler.releaseResources(); - } - - @Override - public void produceRequest(final RequestChannel channel, final HttpContext httpContext) throws HttpException, IOException { - channel.sendRequest(request, entityDetails, httpContext); - } - - @Override - public int available() { - return exchangeHandler.available(); - } - @Override - public void produce(final DataStreamChannel channel) throws IOException { - exchangeHandler.produce(channel); - } - - @Override - public void consumeInformation(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException { - exchangeHandler.consumeInformation(response, httpContext); - } - - @Override - public void consumeResponse( - final HttpResponse response, final EntityDetails entityDetails, final HttpContext httpContext) throws HttpException, IOException { - exchangeHandler.consumeResponse(response, entityDetails, httpContext); - } - - @Override - public void updateCapacity(final CapacityChannel capacityChannel) throws IOException { - exchangeHandler.updateCapacity(capacityChannel); - } - - @Override - public void consume(final ByteBuffer src) throws IOException { - exchangeHandler.consume(src); - } - - @Override - public void streamEnd(final List trailers) throws HttpException, IOException { - exchangeHandler.streamEnd(trailers); - } - - @Override - public void cancel() { - exchangeHandler.cancel(); - } - - @Override - public void failed(final Exception cause) { - exchangeHandler.failed(cause); - } - - }; final Timeout socketTimeout = ioSession.getSocketTimeout(); - ioSession.enqueue(new RequestExecutionCommand( - handlerProxy, - pushHandlerFactory, - context, - streamControl -> { - cancellableDependency.setDependency(streamControl); - if (socketTimeout != null) { - streamControl.setTimeout(socketTimeout); - } - }), - Command.Priority.NORMAL); - if (!ioSession.isOpen()) { - exchangeHandler.failed(new ConnectionClosedException()); + final RequestExecutionCommand command = new RequestExecutionCommand( + exchangeHandler, + pushHandlerFactory, + context, + streamControl -> { + cancellableDependency.setDependency(streamControl); + if (socketTimeout != null) { + streamControl.setTimeout(socketTimeout); + } + }); + + final int max = maxRequestsPerConnection; + final int limit = max > 0 ? max : 0; + final boolean enqueued = ioSession.enqueue(command, Command.Priority.NORMAL, limit); + if (!enqueued) { + if (max > 0 && ioSession.isOpen()) { + exchangeHandler.failed(new RejectedExecutionException( + "Maximum number of pending requests per connection reached (max=" + max + ")")); + } else { + exchangeHandler.failed(new ConnectionClosedException()); + } + exchangeHandler.releaseResources(); + return; } } @@ -350,4 +306,30 @@ public H2ConnPool getConnPool() { return connPool; } + private static final class CancellableExecution implements Cancellable, CancellableDependency { + + private volatile Cancellable dependency; + + @Override + public void setDependency(final Cancellable dependency) { + this.dependency = dependency; + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public boolean cancel() { + final Cancellable local = this.dependency; + if (local != null) { + local.cancel(); + return true; + } + return false; + } + + } + } diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequesterBootstrap.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequesterBootstrap.java index a19e7913fc..8ee8e8cd81 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequesterBootstrap.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequesterBootstrap.java @@ -29,6 +29,7 @@ import java.util.ArrayList; import java.util.List; +import org.apache.hc.core5.annotation.Experimental; import org.apache.hc.core5.function.Callback; import org.apache.hc.core5.function.Decorator; import org.apache.hc.core5.function.Supplier; @@ -76,6 +77,8 @@ public class H2MultiplexingRequesterBootstrap { private IOReactorMetricsListener threadPoolListener; + private int maxRequestsPerConnection; + private H2MultiplexingRequesterBootstrap() { this.routeEntries = new ArrayList<>(); } @@ -180,6 +183,23 @@ public final H2MultiplexingRequesterBootstrap setIOReactorMetricsListener(final return this; } + /** + * Sets a hard limit on the number of pending request execution commands that can be queued per connection. + * When the limit is reached, new submissions fail fast with {@link java.util.concurrent.RejectedExecutionException}. + * A value {@code <= 0} disables the limit (default). + * Note: this limit applies to commands waiting in the connection's internal queue (backlog). HTTP/2 in-flight + * concurrency is governed separately by protocol settings (e.g. MAX_CONCURRENT_STREAMS). + * + * @param max maximum number of pending requests per connection; {@code <= 0} to disable the limit. + * @return this instance. + * @since 5.5 + */ + @Experimental + public final H2MultiplexingRequesterBootstrap setMaxRequestsPerConnection(final int max) { + this.maxRequestsPerConnection = max; + return this; + } + /** * Sets {@link H2StreamListener} instance. * @@ -274,7 +294,8 @@ public H2MultiplexingRequester create() { DefaultAddressResolver.INSTANCE, tlsStrategy != null ? tlsStrategy : new H2ClientTlsStrategy(), threadPoolListener, - null); + null, + maxRequestsPerConnection); } } diff --git a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2MaxRequestsPerConnectionLocalExample.java b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2MaxRequestsPerConnectionLocalExample.java new file mode 100644 index 0000000000..d82975d041 --- /dev/null +++ b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2MaxRequestsPerConnectionLocalExample.java @@ -0,0 +1,245 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.core5.http2.examples; + +import java.net.InetSocketAddress; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.EntityDetails; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.HttpRequest; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.Message; +import org.apache.hc.core5.http.URIScheme; +import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer; +import org.apache.hc.core5.http.nio.AsyncRequestConsumer; +import org.apache.hc.core5.http.nio.AsyncServerRequestHandler; +import org.apache.hc.core5.http.nio.entity.DiscardingEntityConsumer; +import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer; +import org.apache.hc.core5.http.nio.support.AsyncRequestBuilder; +import org.apache.hc.core5.http.nio.support.AsyncResponseBuilder; +import org.apache.hc.core5.http.nio.support.BasicRequestConsumer; +import org.apache.hc.core5.http.nio.support.BasicResponseConsumer; +import org.apache.hc.core5.http.protocol.HttpContext; +import org.apache.hc.core5.http.protocol.HttpCoreContext; +import org.apache.hc.core5.http2.HttpVersionPolicy; +import org.apache.hc.core5.http2.config.H2Config; +import org.apache.hc.core5.http2.impl.nio.bootstrap.H2MultiplexingRequester; +import org.apache.hc.core5.http2.impl.nio.bootstrap.H2MultiplexingRequesterBootstrap; +import org.apache.hc.core5.http2.impl.nio.bootstrap.H2ServerBootstrap; +import org.apache.hc.core5.io.CloseMode; +import org.apache.hc.core5.reactor.IOReactorConfig; +import org.apache.hc.core5.reactor.ListenerEndpoint; +import org.apache.hc.core5.util.Timeout; + +/** + * Local integration example that exercises {@code H2MultiplexingRequesterBootstrap#setMaxRequestsPerConnection(int)}. + *

+ * The example starts a local HTTP/2 server and a single-connection HTTP/2 requester. The server responds with a fixed + * delay to keep streams busy and make the client build up a backlog of request execution commands on the connection. + * With {@code maxRequestsPerConnection} set to a small value, submissions beyond the configured cap fail fast with + * {@link java.util.concurrent.RejectedExecutionException}. This demonstrates a per-connection hard cap on queued + * (pending) requests using the {@link org.apache.hc.core5.reactor.IOSession} command queue, not a separate client-side + * queue. + *

+ * Note this cap limits the number of pending execution commands associated with a single connection. Protocol-level + * concurrency (in-flight streams) is still governed by HTTP/2 settings (for example {@code MAX_CONCURRENT_STREAMS}) + * and server behaviour. + * + * @since 5.5 + */ +public final class H2MaxRequestsPerConnectionLocalExample { + + public static void main(final String[] args) throws Exception { + final int maxPerConn = 2; // keep small + final int totalRequests = 50; // make larger + final Timeout timeout = Timeout.ofSeconds(30); + + final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + + final IOReactorConfig ioReactorConfig = IOReactorConfig.custom() + .setIoThreadCount(1) + .build(); + + final H2Config serverH2Config = H2Config.custom() + .setPushEnabled(false) + .build(); + + final HttpAsyncServer server = H2ServerBootstrap.bootstrap() + .setIOReactorConfig(ioReactorConfig) + .setH2Config(serverH2Config) + .setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_2) + .setCanonicalHostName("127.0.0.1") // avoids 421 + .register("*", new AsyncServerRequestHandler>() { + + @Override + public AsyncRequestConsumer> prepare( + final HttpRequest request, + final EntityDetails entityDetails, + final HttpContext context) { + return new BasicRequestConsumer<>( + entityDetails != null ? new DiscardingEntityConsumer() : null); + } + + @Override + public void handle( + final Message message, + final ResponseTrigger responseTrigger, + final HttpContext context) { + + final String path = message.getHead().getPath(); + System.out.println("server accepted " + path + " (reply in 2s)"); + + scheduler.schedule(() -> { + try { + responseTrigger.submitResponse( + AsyncResponseBuilder.create(200) + .setEntity("ok\n", ContentType.TEXT_PLAIN) + .build(), + context); + } catch (final Exception ex) { + try { + responseTrigger.submitResponse( + AsyncResponseBuilder.create(500) + .setEntity(ex.toString(), ContentType.TEXT_PLAIN) + .build(), + context); + } catch (final Exception ignore) { + // ignore + } + } + }, 2, TimeUnit.SECONDS); + } + + }) + .create(); + + server.start(); + final ListenerEndpoint ep = server.listen(new InetSocketAddress("127.0.0.1", 0), URIScheme.HTTP).get(); + final int port = ((InetSocketAddress) ep.getAddress()).getPort(); + System.out.println("server on 127.0.0.1:" + port); + + final H2Config clientH2Config = H2Config.custom() + .setPushEnabled(false) + .build(); + + final H2MultiplexingRequester requester = H2MultiplexingRequesterBootstrap.bootstrap() + .setIOReactorConfig(ioReactorConfig) + .setH2Config(clientH2Config) + .setMaxRequestsPerConnection(maxPerConn) + .create(); + + requester.start(); + + final HttpHost target = new HttpHost("http", "127.0.0.1", port); + + // Warmup (establish connection) + requester.execute( + target, + AsyncRequestBuilder.get().setHttpHost(target).setPath("/warmup").build(), + new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), + timeout, + HttpCoreContext.create(), + new FutureCallback>() { + @Override + public void completed(final Message result) { + System.out.println("warmup -> " + result.getHead().getCode()); + } + + @Override + public void failed(final Exception ex) { + System.out.println("warmup failed -> " + ex.getClass().getName() + ": " + ex.getMessage()); + } + + @Override + public void cancelled() { + System.out.println("warmup cancelled"); + } + }).get(); + + final AtomicInteger ok = new AtomicInteger(0); + final AtomicInteger rejected = new AtomicInteger(0); + final AtomicInteger failed = new AtomicInteger(0); + final CountDownLatch latch = new CountDownLatch(totalRequests); + + final ExecutorService exec = Executors.newFixedThreadPool(16); + + for (int i = 0; i < totalRequests; i++) { + final int id = i; + exec.execute(() -> { + final String path = "/slow?i=" + id; + requester.execute( + target, + AsyncRequestBuilder.get().setHttpHost(target).setPath(path).build(), + new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), + timeout, + HttpCoreContext.create(), + new FutureCallback>() { + + @Override + public void completed(final Message message) { + ok.incrementAndGet(); + latch.countDown(); + } + + @Override + public void failed(final Exception ex) { + if (ex instanceof RejectedExecutionException) { + rejected.incrementAndGet(); + } else { + failed.incrementAndGet(); + } + latch.countDown(); + } + + @Override + public void cancelled() { + failed.incrementAndGet(); + latch.countDown(); + } + }); + }); + } + + final boolean done = latch.await(60, TimeUnit.SECONDS); + System.out.println("done=" + done + " ok=" + ok.get() + ", rejected=" + rejected.get() + ", failed=" + failed.get()); + + exec.shutdownNow(); + + requester.close(CloseMode.GRACEFUL); + server.close(CloseMode.GRACEFUL); + scheduler.shutdownNow(); + } +} diff --git a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/bootstrap/TestH2MultiplexingRequesterMaxRequestsPerConnection.java b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/bootstrap/TestH2MultiplexingRequesterMaxRequestsPerConnection.java new file mode 100644 index 0000000000..7d9bb191f0 --- /dev/null +++ b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/bootstrap/TestH2MultiplexingRequesterMaxRequestsPerConnection.java @@ -0,0 +1,229 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.core5.http2.impl.nio.bootstrap; + +import java.net.InetSocketAddress; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.EntityDetails; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.HttpRequest; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.Message; +import org.apache.hc.core5.http.URIScheme; +import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer; +import org.apache.hc.core5.http.nio.AsyncRequestConsumer; +import org.apache.hc.core5.http.nio.AsyncServerRequestHandler; +import org.apache.hc.core5.http.nio.entity.DiscardingEntityConsumer; +import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer; +import org.apache.hc.core5.http.nio.support.AsyncRequestBuilder; +import org.apache.hc.core5.http.nio.support.AsyncResponseBuilder; +import org.apache.hc.core5.http.nio.support.BasicRequestConsumer; +import org.apache.hc.core5.http.nio.support.BasicResponseConsumer; +import org.apache.hc.core5.http.protocol.HttpContext; +import org.apache.hc.core5.http.protocol.HttpCoreContext; +import org.apache.hc.core5.http2.HttpVersionPolicy; +import org.apache.hc.core5.http2.config.H2Config; +import org.apache.hc.core5.io.CloseMode; +import org.apache.hc.core5.reactor.IOReactorConfig; +import org.apache.hc.core5.reactor.ListenerEndpoint; +import org.apache.hc.core5.util.Timeout; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +class TestH2MultiplexingRequesterMaxRequestsPerConnection { + + @Test + @org.junit.jupiter.api.Timeout(value = 60, unit = TimeUnit.SECONDS) + void testRejectsWhenLimitReached() throws Exception { + final int maxPerConn = 2; + final int totalRequests = 30; + final Timeout timeout = Timeout.ofSeconds(30); + final long serverDelayMillis = 5000; + + final IOReactorConfig ioReactorConfig = IOReactorConfig.custom() + .setIoThreadCount(1) + .build(); + + final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + + final H2Config serverH2Config = H2Config.custom() + .setPushEnabled(false) + .setMaxConcurrentStreams(1) // force backlog + .build(); + + final HttpAsyncServer server = H2ServerBootstrap.bootstrap() + .setIOReactorConfig(ioReactorConfig) + .setH2Config(serverH2Config) + .setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_2) + .setCanonicalHostName("127.0.0.1") // avoid 421 + .register("*", new AsyncServerRequestHandler>() { + + @Override + public AsyncRequestConsumer> prepare( + final HttpRequest request, + final EntityDetails entityDetails, + final HttpContext context) { + return new BasicRequestConsumer<>( + entityDetails != null ? new DiscardingEntityConsumer<>() : null); + } + + @Override + public void handle( + final Message message, + final ResponseTrigger responseTrigger, + final HttpContext localContext) { + + final HttpCoreContext context = HttpCoreContext.cast(localContext); + final String path = message.getHead().getPath(); + + final Runnable task = () -> { + try { + responseTrigger.submitResponse( + AsyncResponseBuilder.create(200) + .setEntity("ok\n", ContentType.TEXT_PLAIN) + .build(), + context); + } catch (final Exception ignore) { + // ignore + } + }; + + if ("/warmup".equals(path)) { + task.run(); + } else { + scheduler.schedule(task, serverDelayMillis, TimeUnit.MILLISECONDS); + } + } + + }) + .create(); + + server.start(); + final ListenerEndpoint ep = server.listen(new InetSocketAddress("127.0.0.1", 0), URIScheme.HTTP).get(); + final int port = ((InetSocketAddress) ep.getAddress()).getPort(); + + final H2MultiplexingRequester requester = H2MultiplexingRequesterBootstrap.bootstrap() + .setIOReactorConfig(ioReactorConfig) + .setH2Config(H2Config.custom().setPushEnabled(false).build()) + .setMaxRequestsPerConnection(maxPerConn) + .create(); + + + requester.start(); + + try { + final HttpHost target = new HttpHost("http", "127.0.0.1", port); + + // Warmup + requester.execute( + target, + AsyncRequestBuilder.get().setHttpHost(target).setPath("/warmup").build(), + new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), + timeout, + HttpCoreContext.create(), + null).get(); + + final AtomicInteger ok = new AtomicInteger(0); + final AtomicInteger rejected = new AtomicInteger(0); + final AtomicInteger failed = new AtomicInteger(0); + + final CountDownLatch done = new CountDownLatch(totalRequests); + final CountDownLatch start = new CountDownLatch(1); + final ExecutorService exec = Executors.newFixedThreadPool(16); + + for (int i = 0; i < totalRequests; i++) { + final int id = i; + exec.execute(new Runnable() { + @Override + public void run() { + try { + start.await(); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + done.countDown(); + return; + } + + requester.execute( + target, + AsyncRequestBuilder.get().setHttpHost(target).setPath("/slow?i=" + id).build(), + new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), + timeout, + HttpCoreContext.create(), + new FutureCallback>() { + + @Override + public void completed(final Message message) { + ok.incrementAndGet(); + done.countDown(); + } + + @Override + public void failed(final Exception ex) { + if (ex instanceof RejectedExecutionException) { + rejected.incrementAndGet(); + } else { + failed.incrementAndGet(); + } + done.countDown(); + } + + @Override + public void cancelled() { + failed.incrementAndGet(); + done.countDown(); + } + }); + } + }); + } + + start.countDown(); + + final boolean allDone = done.await(60, TimeUnit.SECONDS); + exec.shutdownNow(); + + Assertions.assertTrue(allDone, "Timed out"); + Assertions.assertEquals(totalRequests, ok.get() + rejected.get() + failed.get()); + Assertions.assertTrue(rejected.get() > 0, "Expected at least one RejectedExecutionException"); + Assertions.assertEquals(0, failed.get(), "Unexpected non-rejection failures: " + failed.get()); + } finally { + requester.close(CloseMode.GRACEFUL); + server.close(CloseMode.GRACEFUL); + scheduler.shutdownNow(); + } + } +} diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSession.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSession.java index fd3813fba5..27bd9ec240 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSession.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSession.java @@ -96,6 +96,27 @@ enum Status { */ void enqueue(Command command, Command.Priority priority); + /** + * Returns the number of enqueued commands pending execution. + * + * @since 5.5 + */ + default int getPendingCommandCount() { + return 0; + } + + /** + * Inserts {@link Command} at the end of the command queue if the number of + * enqueued commands pending execution is below the provided maximum. + * + * @return {@code true} if the command was enqueued, {@code false} otherwise. + * @since 5.5 + */ + default boolean enqueue(final Command command, final Command.Priority priority, final int maxPendingCommands) { + enqueue(command, priority); + return true; + } + /** * Tests if there enqueued commands pending execution. * diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionImpl.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionImpl.java index cf2b6023d3..7792c39c62 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionImpl.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionImpl.java @@ -36,6 +36,7 @@ import java.nio.channels.SocketChannel; import java.util.Deque; import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; @@ -55,6 +56,7 @@ class IOSessionImpl implements IOSession { private final SelectionKey key; private final SocketChannel channel; private final Deque commandQueue; + private final AtomicInteger commandQueueSize; private final Lock lock; private final String id; private final AtomicReference handlerRef; @@ -73,6 +75,7 @@ public IOSessionImpl(final String type, final SelectionKey key, final SocketChan this.channel = Args.notNull(socketChannel, "Socket channel"); this.sessionClosedCallback = sessionClosedCallback; this.commandQueue = new ConcurrentLinkedDeque<>(); + this.commandQueueSize = new AtomicInteger(0); this.lock = new ReentrantLock(); this.socketTimeout = Timeout.INFINITE; this.id = String.format(type + "-%010d", COUNT.getAndIncrement()); @@ -106,16 +109,50 @@ public Lock getLock() { @Override public void enqueue(final Command command, final Command.Priority priority) { + enqueue(command, priority, 0); + } + + @Override + public int getPendingCommandCount() { + return commandQueueSize.get(); + } + + @Override + public boolean enqueue( + final Command command, + final Command.Priority priority, + final int maxPendingCommands) { + Args.notNull(command, "Command"); + + if (maxPendingCommands > 0) { + for (;;) { + final int current = commandQueueSize.get(); + if (current >= maxPendingCommands) { + return false; + } + if (commandQueueSize.compareAndSet(current, current + 1)) { + break; + } + } + } else { + commandQueueSize.incrementAndGet(); + } + if (priority == Command.Priority.IMMEDIATE) { commandQueue.addFirst(command); } else { - commandQueue.add(command); + commandQueue.addLast(command); } + if (isOpen()) { setEvent(SelectionKey.OP_WRITE); - } else { - command.cancel(); + return true; } + command.cancel(); + if (commandQueue.remove(command)) { + commandQueueSize.decrementAndGet(); + } + return false; } @Override @@ -125,7 +162,11 @@ public boolean hasCommands() { @Override public Command poll() { - return commandQueue.poll(); + final Command command = commandQueue.poll(); + if (command != null) { + commandQueueSize.decrementAndGet(); + } + return command; } @Override diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalDataChannel.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalDataChannel.java index d0bd866287..3e9e1b3b7c 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalDataChannel.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalDataChannel.java @@ -409,4 +409,18 @@ public String toString() { return Objects.toString(currentSession != null ? currentSession : ioSession, null); } + @Override + public int getPendingCommandCount() { + return currentSessionRef.get().getPendingCommandCount(); + } + + @Override + public boolean enqueue( + final Command command, + final Command.Priority priority, + final int maxPendingCommands) { + return currentSessionRef.get().enqueue(command, priority, maxPendingCommands); + } + + } diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ssl/SSLIOSession.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/ssl/SSLIOSession.java index 9cb55fcd43..3f64ad07ea 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ssl/SSLIOSession.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/ssl/SSLIOSession.java @@ -932,4 +932,17 @@ public String toString() { } } + @Override + public int getPendingCommandCount() { + return session.getPendingCommandCount(); + } + + @Override + public boolean enqueue( + final Command command, + final Command.Priority priority, + final int maxPendingCommands) { + return session.enqueue(command, priority, maxPendingCommands); + } + } diff --git a/httpcore5/src/test/java/org/apache/hc/core5/reactor/TestIOSessionImplMaxPendingCommands.java b/httpcore5/src/test/java/org/apache/hc/core5/reactor/TestIOSessionImplMaxPendingCommands.java new file mode 100644 index 0000000000..81f2f59369 --- /dev/null +++ b/httpcore5/src/test/java/org/apache/hc/core5/reactor/TestIOSessionImplMaxPendingCommands.java @@ -0,0 +1,98 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.core5.reactor; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.net.InetSocketAddress; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +class TestIOSessionImplMaxPendingCommands { + + private static final class DummyCommand implements Command { + + private final AtomicBoolean cancelled = new AtomicBoolean(false); + + @Override + public boolean cancel() { + return cancelled.compareAndSet(false, true); + } + + } + + @Test + @Timeout(5) + void enqueueShouldRejectWhenQueueIsFull() throws Exception { + final ServerSocketChannel server = ServerSocketChannel.open(); + server.bind(new InetSocketAddress("127.0.0.1", 0)); + final int port = ((InetSocketAddress) server.getLocalAddress()).getPort(); + + final SocketChannel client = SocketChannel.open(); + client.connect(new InetSocketAddress("127.0.0.1", port)); + final SocketChannel accepted = server.accept(); + + client.configureBlocking(false); + final Selector selector = Selector.open(); + final SelectionKey key = client.register(selector, SelectionKey.OP_READ); + + final IOSessionImpl session = new IOSessionImpl("c", key, client, null); + + try { + final int max = 2; + + final DummyCommand c1 = new DummyCommand(); + final DummyCommand c2 = new DummyCommand(); + final DummyCommand c3 = new DummyCommand(); + + assertTrue(session.enqueue(c1, Command.Priority.NORMAL, max)); + assertTrue(session.enqueue(c2, Command.Priority.NORMAL, max)); + assertFalse(session.enqueue(c3, Command.Priority.NORMAL, max)); + + assertEquals(2, session.getPendingCommandCount()); + + session.poll(); + assertEquals(1, session.getPendingCommandCount()); + + session.poll(); + assertEquals(0, session.getPendingCommandCount()); + } finally { + session.close(); + accepted.close(); + server.close(); + selector.close(); + } + } +}