Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.ignite.client.SslProtocol;
import org.apache.ignite.internal.client.thin.TcpIgniteClient;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;

/**
* {@link TcpIgniteClient} configuration.
Expand All @@ -57,8 +58,11 @@ public final class ClientConfiguration implements Serializable {
/** @serial Tcp no delay. */
private boolean tcpNoDelay = true;

/** @serial Timeout. 0 means infinite. */
private int timeout;
/** @serial Connection timeout in milliseconds. 0 means infinite. */
private int connTimeout;

/** @serial Request timeout in milliseconds. 0 means infinite. */
Copy link
Member

@timoninmaxim timoninmaxim Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check that actual javadocs contains lines started with 'serial'. For which purpose this tag is used?

private int reqTimeout;

/** @serial Send buffer size. 0 means system default. */
private int sndBufSize = 32 * 1024;
Expand Down Expand Up @@ -227,19 +231,62 @@ public ClientConfiguration setTcpNoDelay(boolean tcpNoDelay) {
}

/**
* @return Send/receive timeout in milliseconds.
* @deprecated Use {@link #getConnectionTimeout()} and {@link #getRequestTimeout()} instead.
* @return Request timeout in milliseconds.
*/
@Deprecated
public int getTimeout() {
return timeout;
if (reqTimeout != connTimeout )
U.warn(logger, String.format(
"Deprecated getTimeout() API is used while request timeout (%d) differs from connection timeout (%d). " +
"Returning request timeout. Please use getRequestTimeout() and getConnectionTimeout() instead.",
reqTimeout, connTimeout
));

return reqTimeout;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add WARN log if reqTimeout != connTimeout. The warn message should mention that deprecated API is used and request timeout is returned.

}

/**
* @deprecated Use {@link #setConnectionTimeout(int)} and {@link #setRequestTimeout(int)} instead.
* @param timeout Send/receive timeout in milliseconds.
* @return {@code this} for chaining.
*/
@Deprecated
public ClientConfiguration setTimeout(int timeout) {
this.timeout = timeout;
this.connTimeout = timeout;
this.reqTimeout = timeout;
return this;
}

/**
* @return Connection timeout in milliseconds. 0 means infinite.
*/
public int getConnectionTimeout() {
return connTimeout;
}

/**
* @param connTimeout Connection timeout in milliseconds. 0 means infinite.
* @return {@code this} for chaining.
*/
public ClientConfiguration setConnectionTimeout(int connTimeout) {
this.connTimeout = connTimeout;
return this;
}

/**
* @return Request timeout in milliseconds. 0 means infinite.
*/
public int getRequestTimeout() {
return reqTimeout;
}

/**
* @param reqTimeout Request timeout in milliseconds. 0 means infinite.
* @return {@code this} for chaining.
*/
public ClientConfiguration setRequestTimeout(int reqTimeout) {
this.reqTimeout = reqTimeout;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,11 @@ final class ClientChannelConfiguration {
/** Tcp no delay. */
private final boolean tcpNoDelay;

/** Timeout. */
private final int timeout;
/** Connection timeout. */
private final int connTimeout;

/** Request timeout. */
private final int reqTimeout;

/** Send buffer size. */
private final int sndBufSize;
Expand Down Expand Up @@ -123,7 +126,8 @@ final class ClientChannelConfiguration {
ClientChannelConfiguration(ClientConfiguration cfg, List<InetSocketAddress> addrs) {
this.sslMode = cfg.getSslMode();
this.tcpNoDelay = cfg.isTcpNoDelay();
this.timeout = cfg.getTimeout();
this.connTimeout = cfg.getConnectionTimeout();
this.reqTimeout = cfg.getRequestTimeout();
this.sndBufSize = cfg.getSendBufferSize();
this.rcvBufSize = cfg.getReceiveBufferSize();
this.sslClientCertKeyStorePath = cfg.getSslClientCertificateKeyStorePath();
Expand Down Expand Up @@ -172,10 +176,26 @@ public boolean isTcpNoDelay() {
}

/**
* @return Timeout.
* @deprecated Use {@link #getConnectionTimeout()} and {@link #getRequestTimeout()} instead.
* @return Request timeout.
*/
@Deprecated
public int getTimeout() {
return timeout;
return reqTimeout;
}

/**
* @return Connection timeout.
*/
public int getConnectionTimeout() {
return connTimeout;
}

/**
* @return Request timeout.
*/
public int getRequestTimeout() {
return reqTimeout;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,11 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
/** Executor for async operation listeners. */
private final Executor asyncContinuationExecutor;

/** Send/receive timeout in milliseconds. */
private final int timeout;
/** Connection timeout in milliseconds. */
private final int connTimeout;

/** Request timeout in milliseconds. */
private final int reqTimeout;

/** Heartbeat timer. */
private final Timer heartbeatTimer;
Expand Down Expand Up @@ -195,7 +198,8 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
Executor cfgExec = cfg.getAsyncContinuationExecutor();
asyncContinuationExecutor = cfgExec != null ? cfgExec : ForkJoinPool.commonPool();

timeout = cfg.getTimeout();
connTimeout = cfg.getConnectionTimeout();
reqTimeout = cfg.getRequestTimeout();

List<InetSocketAddress> addrs = cfg.getAddresses();

Expand Down Expand Up @@ -419,7 +423,7 @@ private <T> T receive(ClientRequestFuture pendingReq, Function<PayloadInputChann
long startTimeNanos = pendingReq.startTimeNanos;

try {
ByteBuffer payload = timeout > 0 ? pendingReq.get(timeout) : pendingReq.get();
ByteBuffer payload = reqTimeout > 0 ? pendingReq.get(reqTimeout) : pendingReq.get();

T res = null;
if (payload != null && payloadReader != null)
Expand Down Expand Up @@ -736,7 +740,7 @@ private void handshake(ProtocolVersion ver, String user, String pwd, Map<String,
handshakeReq(ver, user, pwd, userAttrs);

try {
ByteBuffer buf = timeout > 0 ? fut.get(timeout) : fut.get();
ByteBuffer buf = connTimeout > 0 ? fut.get(connTimeout) : fut.get();

BinaryInputStream res = BinaryStreams.inputStream(buf);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public GridNioClientConnectionMultiplexer(ClientConfiguration cfg) {
else
filters = new GridNioFilter[] {codecFilter};

connTimeout = cfg.getTimeout();
connTimeout = cfg.getConnectionTimeout();

try {
srv = GridNioServer.<ByteBuffer>builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ public class ClientConfigurationTest {
public void testSerialization() throws IOException, ClassNotFoundException {
ClientConfiguration target = new ClientConfiguration()
.setAddresses("127.0.0.1:10800", "127.0.0.1:10801")
.setTimeout(123)
.setConnectionTimeout(123)
.setRequestTimeout(123)
.setBinaryConfiguration(new BinaryConfiguration()
.setClassNames(Collections.singleton("Person"))
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ public static boolean equal(ClientConfiguration a, Object o) {

return Arrays.equals(a.getAddresses(), b.getAddresses()) &&
a.isTcpNoDelay() == b.isTcpNoDelay() &&
a.getTimeout() == b.getTimeout() &&
a.getConnectionTimeout() == b.getConnectionTimeout() &&
a.getRequestTimeout() == b.getRequestTimeout() &&
a.getSendBufferSize() == b.getSendBufferSize() &&
a.getReceiveBufferSize() == b.getReceiveBufferSize();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,8 @@ public void testUnreachableAddressDiscoveredDoesNotPreventClientInit() throws Ex
// Config has good server address, client discovery returns unreachable address.
// We expect the client to connect to the good address and ignore the unreachable one.
ClientConfiguration ccfg = new ClientConfiguration()
.setTimeout(2000)
.setConnectionTimeout(2000)
.setRequestTimeout(2000)
.setAddresses("127.0.0.1:" + DFLT_PORT);

IgniteClient client = Ignition.startClient(ccfg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.ignite.configuration.ClientConfiguration;
import org.apache.ignite.configuration.ClientConnectorConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.binary.streams.BinaryOutputStream;
import org.apache.ignite.internal.binary.streams.BinaryStreams;
Expand Down Expand Up @@ -67,7 +68,7 @@ public class TimeoutTest extends AbstractThinClientTest {

/** {@inheritDoc} */
@Override protected ClientConfiguration getClientConfiguration() {
return super.getClientConfiguration().setTimeout(TIMEOUT);
return super.getClientConfiguration().setConnectionTimeout(TIMEOUT).setRequestTimeout(TIMEOUT);
}

/**
Expand Down Expand Up @@ -217,4 +218,107 @@ public void testClientTimeoutOnOperation() throws Exception {
}
}
}

/**
* Test that connection timeout is independent of request timeout during connection establishment.
*/
@Test
@SuppressWarnings("ThrowableNotThrown")
public void testConnectionTimeoutIndependentOfRequest() throws Exception {
ServerSocket sock = new ServerSocket();
sock.bind(new InetSocketAddress("127.0.0.1", DFLT_PORT));

CountDownLatch connectionAccepted = new CountDownLatch(1);

IgniteInternalFuture<?> fut = GridTestUtils.runAsync(() -> {
try {
Socket accepted = sock.accept();
connectionAccepted.countDown();

Thread.sleep(2000);

U.closeQuiet(accepted);
}
catch (Exception e) {
throw new IgniteException("Accept thread failed: " + e.getMessage(), e);
}
});

try {
ClientConfiguration cfg = new ClientConfiguration()
.setAddresses("127.0.0.1:" + DFLT_PORT)
.setConnectionTimeout(500)
.setRequestTimeout(Integer.MAX_VALUE);

GridTestUtils.assertThrowsWithCause(
() -> Ignition.startClient(cfg),
IgniteFutureTimeoutCheckedException.class
);
}
finally {
U.closeQuiet(sock);
}

assertTrue("Connection should have been accepted", connectionAccepted.await(1, TimeUnit.SECONDS));

fut.get();
}

/**
* Test that request timeout is independent of connection timeout during operations.
*/
@Test
@SuppressWarnings("ThrowableNotThrown")
public void testRequestTimeoutIndependentOfConnection() throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar for this test:

  1. Set server and connection timeous to Integer.MAX_VALUE
  2. Just check that exception contains timeout exception (X.hasCause(IgniteFutureTimeoutCheckedException.class))

IgniteConfiguration igniteCfg = getConfiguration(getTestIgniteInstanceName());
igniteCfg.setClientConnectorConfiguration(new ClientConnectorConfiguration().setHandshakeTimeout(Integer.MAX_VALUE));

try (Ignite ignite = startGrid(igniteCfg)) {
ClientConfiguration cfg = getClientConfiguration(ignite)
.setConnectionTimeout(Integer.MAX_VALUE)
.setRequestTimeout(500);

try (IgniteClient client = Ignition.startClient(cfg)) {
ClientCache<Object, Object> cache = client.getOrCreateCache("testTimeoutCache");

ClientCacheConfiguration txCacheCfg = new ClientCacheConfiguration()
.setName("txCache")
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);

ClientCache<Object, Object> txCache = client.getOrCreateCache(txCacheCfg);

CyclicBarrier barrier = new CyclicBarrier(2);

IgniteInternalFuture<?> blockingThread = GridTestUtils.runAsync(() -> {
try (ClientTransaction ignored1 = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
txCache.put(1, "blocked");

barrier.await(2, TimeUnit.SECONDS);

// Wait for main thread to time out
barrier.await(2, TimeUnit.SECONDS);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For which reason you awaits main thread after sleeping?

}
catch (Exception e) {
throw new IgniteException(e);
}
});

barrier.await(2, TimeUnit.SECONDS);

try (ClientTransaction ignored1 = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
GridTestUtils.assertThrowsWithCause(
() -> txCache.put(1, "should timeout"),
IgniteFutureTimeoutCheckedException.class
);
}

barrier.await(2, TimeUnit.SECONDS);

cache.put(2, "still works");
assertEquals("still works", cache.get(2));

blockingThread.get();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ public void testClientSessionOutboundQueueLimit() throws Exception {
try (
IgniteClient cli = Ignition.startClient(new ClientConfiguration()
.setAddresses("127.0.0.1:10800")
.setTimeout(5000) // Server will drop packets intended for the client. So client can hang on handshake during reconnect.
.setConnectionTimeout(5000) // Server will drop packets intended for the client.
// So client can hang on handshake during reconnect.
.setRequestTimeout(5000)
.setRetryLimit(1) // Let's not retry operations if the channel was closed while waiting for a response.
.setEventListeners(new ConnectionEventListener() {
@Override public void onConnectionClosed(ConnectionClosedEvent event) {
Expand Down