diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/ClientConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/ClientConfiguration.java index ed1f0319fb0ce..7776b5b78f5e1 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/ClientConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/ClientConfiguration.java @@ -39,6 +39,7 @@ import org.apache.ignite.client.SslProtocol; import org.apache.ignite.internal.client.thin.TcpIgniteClient; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; /** * {@link TcpIgniteClient} configuration. @@ -57,8 +58,11 @@ public final class ClientConfiguration implements Serializable { /** @serial Tcp no delay. */ private boolean tcpNoDelay = true; - /** @serial Timeout. 0 means infinite. */ - private int timeout; + /** @serial Connection timeout in milliseconds. 0 means infinite. */ + private int connTimeout; + + /** @serial Request timeout in milliseconds. 0 means infinite. */ + private int reqTimeout; /** @serial Send buffer size. 0 means system default. */ private int sndBufSize = 32 * 1024; @@ -227,19 +231,62 @@ public ClientConfiguration setTcpNoDelay(boolean tcpNoDelay) { } /** - * @return Send/receive timeout in milliseconds. + * @deprecated Use {@link #getConnectionTimeout()} and {@link #getRequestTimeout()} instead. + * @return Request timeout in milliseconds. */ + @Deprecated public int getTimeout() { - return timeout; + if (reqTimeout != connTimeout ) + U.warn(logger, String.format( + "Deprecated getTimeout() API is used while request timeout (%d) differs from connection timeout (%d). " + + "Returning request timeout. Please use getRequestTimeout() and getConnectionTimeout() instead.", + reqTimeout, connTimeout + )); + + return reqTimeout; } /** + * @deprecated Use {@link #setConnectionTimeout(int)} and {@link #setRequestTimeout(int)} instead. * @param timeout Send/receive timeout in milliseconds. * @return {@code this} for chaining. */ + @Deprecated public ClientConfiguration setTimeout(int timeout) { - this.timeout = timeout; + this.connTimeout = timeout; + this.reqTimeout = timeout; + return this; + } + /** + * @return Connection timeout in milliseconds. 0 means infinite. + */ + public int getConnectionTimeout() { + return connTimeout; + } + + /** + * @param connTimeout Connection timeout in milliseconds. 0 means infinite. + * @return {@code this} for chaining. + */ + public ClientConfiguration setConnectionTimeout(int connTimeout) { + this.connTimeout = connTimeout; + return this; + } + + /** + * @return Request timeout in milliseconds. 0 means infinite. + */ + public int getRequestTimeout() { + return reqTimeout; + } + + /** + * @param reqTimeout Request timeout in milliseconds. 0 means infinite. + * @return {@code this} for chaining. + */ + public ClientConfiguration setRequestTimeout(int reqTimeout) { + this.reqTimeout = reqTimeout; return this; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientChannelConfiguration.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientChannelConfiguration.java index 97cdc822159de..fc77070ed59da 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientChannelConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientChannelConfiguration.java @@ -44,8 +44,11 @@ final class ClientChannelConfiguration { /** Tcp no delay. */ private final boolean tcpNoDelay; - /** Timeout. */ - private final int timeout; + /** Connection timeout. */ + private final int connTimeout; + + /** Request timeout. */ + private final int reqTimeout; /** Send buffer size. */ private final int sndBufSize; @@ -123,7 +126,8 @@ final class ClientChannelConfiguration { ClientChannelConfiguration(ClientConfiguration cfg, List addrs) { this.sslMode = cfg.getSslMode(); this.tcpNoDelay = cfg.isTcpNoDelay(); - this.timeout = cfg.getTimeout(); + this.connTimeout = cfg.getConnectionTimeout(); + this.reqTimeout = cfg.getRequestTimeout(); this.sndBufSize = cfg.getSendBufferSize(); this.rcvBufSize = cfg.getReceiveBufferSize(); this.sslClientCertKeyStorePath = cfg.getSslClientCertificateKeyStorePath(); @@ -172,10 +176,26 @@ public boolean isTcpNoDelay() { } /** - * @return Timeout. + * @deprecated Use {@link #getConnectionTimeout()} and {@link #getRequestTimeout()} instead. + * @return Request timeout. */ + @Deprecated public int getTimeout() { - return timeout; + return reqTimeout; + } + + /** + * @return Connection timeout. + */ + public int getConnectionTimeout() { + return connTimeout; + } + + /** + * @return Request timeout. + */ + public int getRequestTimeout() { + return reqTimeout; } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java index f8316faf64c2c..378b3cc701c00 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java @@ -160,8 +160,11 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon /** Executor for async operation listeners. */ private final Executor asyncContinuationExecutor; - /** Send/receive timeout in milliseconds. */ - private final int timeout; + /** Connection timeout in milliseconds. */ + private final int connTimeout; + + /** Request timeout in milliseconds. */ + private final int reqTimeout; /** Heartbeat timer. */ private final Timer heartbeatTimer; @@ -195,7 +198,8 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon Executor cfgExec = cfg.getAsyncContinuationExecutor(); asyncContinuationExecutor = cfgExec != null ? cfgExec : ForkJoinPool.commonPool(); - timeout = cfg.getTimeout(); + connTimeout = cfg.getConnectionTimeout(); + reqTimeout = cfg.getRequestTimeout(); List addrs = cfg.getAddresses(); @@ -419,7 +423,7 @@ private T receive(ClientRequestFuture pendingReq, Function 0 ? pendingReq.get(timeout) : pendingReq.get(); + ByteBuffer payload = reqTimeout > 0 ? pendingReq.get(reqTimeout) : pendingReq.get(); T res = null; if (payload != null && payloadReader != null) @@ -736,7 +740,7 @@ private void handshake(ProtocolVersion ver, String user, String pwd, Map 0 ? fut.get(timeout) : fut.get(); + ByteBuffer buf = connTimeout > 0 ? fut.get(connTimeout) : fut.get(); BinaryInputStream res = BinaryStreams.inputStream(buf); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientConnectionMultiplexer.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientConnectionMultiplexer.java index d2f6895ac1dcb..9c2b78a457a5a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientConnectionMultiplexer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientConnectionMultiplexer.java @@ -89,7 +89,7 @@ public GridNioClientConnectionMultiplexer(ClientConfiguration cfg) { else filters = new GridNioFilter[] {codecFilter}; - connTimeout = cfg.getTimeout(); + connTimeout = cfg.getConnectionTimeout(); try { srv = GridNioServer.builder() diff --git a/modules/core/src/test/java/org/apache/ignite/client/ClientConfigurationTest.java b/modules/core/src/test/java/org/apache/ignite/client/ClientConfigurationTest.java index 8f882653b099a..7893db65006ac 100644 --- a/modules/core/src/test/java/org/apache/ignite/client/ClientConfigurationTest.java +++ b/modules/core/src/test/java/org/apache/ignite/client/ClientConfigurationTest.java @@ -59,7 +59,8 @@ public class ClientConfigurationTest { public void testSerialization() throws IOException, ClassNotFoundException { ClientConfiguration target = new ClientConfiguration() .setAddresses("127.0.0.1:10800", "127.0.0.1:10801") - .setTimeout(123) + .setConnectionTimeout(123) + .setRequestTimeout(123) .setBinaryConfiguration(new BinaryConfiguration() .setClassNames(Collections.singleton("Person")) ) diff --git a/modules/core/src/test/java/org/apache/ignite/client/Comparers.java b/modules/core/src/test/java/org/apache/ignite/client/Comparers.java index 4442196c40d73..bebdfeabe731b 100644 --- a/modules/core/src/test/java/org/apache/ignite/client/Comparers.java +++ b/modules/core/src/test/java/org/apache/ignite/client/Comparers.java @@ -36,7 +36,8 @@ public static boolean equal(ClientConfiguration a, Object o) { return Arrays.equals(a.getAddresses(), b.getAddresses()) && a.isTcpNoDelay() == b.isTcpNoDelay() && - a.getTimeout() == b.getTimeout() && + a.getConnectionTimeout() == b.getConnectionTimeout() && + a.getRequestTimeout() == b.getRequestTimeout() && a.getSendBufferSize() == b.getSendBufferSize() && a.getReceiveBufferSize() == b.getReceiveBufferSize(); } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientEnpointsDiscoveryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientEnpointsDiscoveryTest.java index bcb7627f8ecaf..0bf585bcc0607 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientEnpointsDiscoveryTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientEnpointsDiscoveryTest.java @@ -158,7 +158,8 @@ public void testUnreachableAddressDiscoveredDoesNotPreventClientInit() throws Ex // Config has good server address, client discovery returns unreachable address. // We expect the client to connect to the good address and ignore the unreachable one. ClientConfiguration ccfg = new ClientConfiguration() - .setTimeout(2000) + .setConnectionTimeout(2000) + .setRequestTimeout(2000) .setAddresses("127.0.0.1:" + DFLT_PORT); IgniteClient client = Ignition.startClient(ccfg); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/TimeoutTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/TimeoutTest.java index 882a4df96bfc1..23bb017456879 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/TimeoutTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/TimeoutTest.java @@ -39,6 +39,7 @@ import org.apache.ignite.configuration.ClientConfiguration; import org.apache.ignite.configuration.ClientConnectorConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.binary.streams.BinaryOutputStream; import org.apache.ignite.internal.binary.streams.BinaryStreams; @@ -67,7 +68,7 @@ public class TimeoutTest extends AbstractThinClientTest { /** {@inheritDoc} */ @Override protected ClientConfiguration getClientConfiguration() { - return super.getClientConfiguration().setTimeout(TIMEOUT); + return super.getClientConfiguration().setConnectionTimeout(TIMEOUT).setRequestTimeout(TIMEOUT); } /** @@ -217,4 +218,107 @@ public void testClientTimeoutOnOperation() throws Exception { } } } + + /** + * Test that connection timeout is independent of request timeout during connection establishment. + */ + @Test + @SuppressWarnings("ThrowableNotThrown") + public void testConnectionTimeoutIndependentOfRequest() throws Exception { + ServerSocket sock = new ServerSocket(); + sock.bind(new InetSocketAddress("127.0.0.1", DFLT_PORT)); + + CountDownLatch connectionAccepted = new CountDownLatch(1); + + IgniteInternalFuture fut = GridTestUtils.runAsync(() -> { + try { + Socket accepted = sock.accept(); + connectionAccepted.countDown(); + + Thread.sleep(2000); + + U.closeQuiet(accepted); + } + catch (Exception e) { + throw new IgniteException("Accept thread failed: " + e.getMessage(), e); + } + }); + + try { + ClientConfiguration cfg = new ClientConfiguration() + .setAddresses("127.0.0.1:" + DFLT_PORT) + .setConnectionTimeout(500) + .setRequestTimeout(Integer.MAX_VALUE); + + GridTestUtils.assertThrowsWithCause( + () -> Ignition.startClient(cfg), + IgniteFutureTimeoutCheckedException.class + ); + } + finally { + U.closeQuiet(sock); + } + + assertTrue("Connection should have been accepted", connectionAccepted.await(1, TimeUnit.SECONDS)); + + fut.get(); + } + + /** + * Test that request timeout is independent of connection timeout during operations. + */ + @Test + @SuppressWarnings("ThrowableNotThrown") + public void testRequestTimeoutIndependentOfConnection() throws Exception { + IgniteConfiguration igniteCfg = getConfiguration(getTestIgniteInstanceName()); + igniteCfg.setClientConnectorConfiguration(new ClientConnectorConfiguration().setHandshakeTimeout(Integer.MAX_VALUE)); + + try (Ignite ignite = startGrid(igniteCfg)) { + ClientConfiguration cfg = getClientConfiguration(ignite) + .setConnectionTimeout(Integer.MAX_VALUE) + .setRequestTimeout(500); + + try (IgniteClient client = Ignition.startClient(cfg)) { + ClientCache cache = client.getOrCreateCache("testTimeoutCache"); + + ClientCacheConfiguration txCacheCfg = new ClientCacheConfiguration() + .setName("txCache") + .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); + + ClientCache txCache = client.getOrCreateCache(txCacheCfg); + + CyclicBarrier barrier = new CyclicBarrier(2); + + IgniteInternalFuture blockingThread = GridTestUtils.runAsync(() -> { + try (ClientTransaction ignored1 = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + txCache.put(1, "blocked"); + + barrier.await(2, TimeUnit.SECONDS); + + // Wait for main thread to time out + barrier.await(2, TimeUnit.SECONDS); + } + catch (Exception e) { + throw new IgniteException(e); + } + }); + + barrier.await(2, TimeUnit.SECONDS); + + try (ClientTransaction ignored1 = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + GridTestUtils.assertThrowsWithCause( + () -> txCache.put(1, "should timeout"), + IgniteFutureTimeoutCheckedException.class + ); + } + + barrier.await(2, TimeUnit.SECONDS); + + cache.put(2, "still works"); + assertEquals("still works", cache.get(2)); + + blockingThread.get(); + } + } + } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/odbc/ClientSessionOutboundQueueLimitTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/odbc/ClientSessionOutboundQueueLimitTest.java index 27bb599c8619a..6c53a23cd2a4b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/odbc/ClientSessionOutboundQueueLimitTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/odbc/ClientSessionOutboundQueueLimitTest.java @@ -68,7 +68,9 @@ public void testClientSessionOutboundQueueLimit() throws Exception { try ( IgniteClient cli = Ignition.startClient(new ClientConfiguration() .setAddresses("127.0.0.1:10800") - .setTimeout(5000) // Server will drop packets intended for the client. So client can hang on handshake during reconnect. + .setConnectionTimeout(5000) // Server will drop packets intended for the client. + // So client can hang on handshake during reconnect. + .setRequestTimeout(5000) .setRetryLimit(1) // Let's not retry operations if the channel was closed while waiting for a response. .setEventListeners(new ConnectionEventListener() { @Override public void onConnectionClosed(ConnectionClosedEvent event) {