From bf092f579cdc5f73ba199952054f4a5cc516a50c Mon Sep 17 00:00:00 2001 From: Aleksey Plekhanov Date: Thu, 14 Aug 2025 17:19:33 +0300 Subject: [PATCH 1/3] IGNITE-26209 Add metrics to improve node network unavailability detection --- .../internal/util/nio/GridNioServer.java | 6 + .../util/nio/GridSelectorNioSessionImpl.java | 32 +++ .../tcp/TcpCommunicationSpi.java | 6 + .../tcp/internal/GridNioServerWrapper.java | 38 +++- .../TcpCommunicationConfigInitializer.java | 27 +++ .../TcpCommunicationConfiguration.java | 17 ++ .../ignite/spi/discovery/tcp/ServerImpl.java | 19 ++ .../IgniteSlowClientDetectionSelfTest.java | 12 +- .../OutboundIoMessageQueueSizeTest.java | 191 ++++++++++++++++++ .../ClientReconnectContinuousQueryTest.java | 22 +- .../ignite/testframework/GridTestUtils.java | 16 ++ .../testsuites/IgniteCacheTestSuite13.java | 2 + 12 files changed, 356 insertions(+), 32 deletions(-) create mode 100644 modules/core/src/test/java/org/apache/ignite/internal/metric/OutboundIoMessageQueueSizeTest.java diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java index 52c3fb053e204..39c40fcef8f29 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java @@ -151,6 +151,12 @@ public class GridNioServer { /** */ public static final String OUTBOUND_MESSAGES_QUEUE_SIZE_METRIC_DESC = "Number of messages waiting to be sent"; + /** */ + public static final String MAX_MESSAGES_QUEUE_SIZE_METRIC_NAME = "maxOutboundMessagesQueueSize"; + + /** */ + public static final String MAX_MESSAGES_QUEUE_SIZE_METRIC_DESC = "Maximum number of messages waiting to be sent"; + /** */ public static final String RECEIVED_BYTES_METRIC_NAME = "receivedBytes"; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java index 85e76900b36cf..2012d67c91266 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java @@ -30,6 +30,7 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.processors.metric.MetricRegistryImpl; import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric; +import org.apache.ignite.internal.processors.metric.impl.MaxValueMetric; import org.apache.ignite.internal.processors.tracing.MTC; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.internal.LT; @@ -38,6 +39,8 @@ import org.jetbrains.annotations.Nullable; import static org.apache.ignite.internal.processors.tracing.messages.TraceableMessagesTable.traceName; +import static org.apache.ignite.internal.util.nio.GridNioServer.MAX_MESSAGES_QUEUE_SIZE_METRIC_DESC; +import static org.apache.ignite.internal.util.nio.GridNioServer.MAX_MESSAGES_QUEUE_SIZE_METRIC_NAME; import static org.apache.ignite.internal.util.nio.GridNioServer.OUTBOUND_MESSAGES_QUEUE_SIZE_METRIC_DESC; import static org.apache.ignite.internal.util.nio.GridNioServer.OUTBOUND_MESSAGES_QUEUE_SIZE_METRIC_NAME; @@ -91,6 +94,9 @@ public class GridSelectorNioSessionImpl extends GridNioSessionImpl implements Gr /** Outbound messages queue size metric. */ @Nullable private final LongAdderMetric outboundMessagesQueueSizeMetric; + /** Maximum outbound messages queue size metric. */ + @Nullable private final MaxValueMetric maxMessagesQueueSizeMetric; + /** * Creates session instance. * @@ -148,6 +154,13 @@ public class GridSelectorNioSessionImpl extends GridNioSessionImpl implements Gr OUTBOUND_MESSAGES_QUEUE_SIZE_METRIC_NAME, OUTBOUND_MESSAGES_QUEUE_SIZE_METRIC_DESC ); + + maxMessagesQueueSizeMetric = mreg == null ? null : mreg.maxValueMetric( + MAX_MESSAGES_QUEUE_SIZE_METRIC_NAME, + MAX_MESSAGES_QUEUE_SIZE_METRIC_DESC, + 60_000, + 5 + ); } /** {@inheritDoc} */ @@ -316,6 +329,14 @@ int offerSystemFuture(SessionWriteRequest writeFut) { if (outboundMessagesQueueSizeMetric != null) outboundMessagesQueueSizeMetric.increment(); + if (maxMessagesQueueSizeMetric != null) { + int queueSize = queue.sizex(); + + maxMessagesQueueSizeMetric.update(queueSize); + + return queueSize; + } + return queue.sizex(); } @@ -346,6 +367,14 @@ int offerFuture(SessionWriteRequest writeFut) { if (outboundMessagesQueueSizeMetric != null) outboundMessagesQueueSizeMetric.increment(); + if (maxMessagesQueueSizeMetric != null) { + int queueSize = queue.sizex(); + + maxMessagesQueueSizeMetric.update(queueSize); + + return queueSize; + } + return queue.sizex(); } @@ -361,6 +390,9 @@ void resend(Collection futs) { if (outboundMessagesQueueSizeMetric != null) outboundMessagesQueueSizeMetric.add(futs.size()); + + if (maxMessagesQueueSizeMetric != null) + maxMessagesQueueSizeMetric.update(futs.size()); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 2beea5091d1a7..73a15f33fd427 100755 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -751,6 +751,12 @@ private void dumpInfo(StringBuilder sb, UUID dstNodeId) { ", slowClientQueueLimit=" + cfg.slowClientQueueLimit() + ']'); } + if (cfg.messageQueueWarningSize() > 0 && cfg.messageQueueLimit() > 0 && cfg.messageQueueWarningSize() >= cfg.messageQueueLimit()) { + U.quietAndWarn(log, "Message queue warning size is set to a value greater than or equal to message " + + "queue limit (message queue warning size will have no effect) [msgQueueLimit=" + cfg.messageQueueLimit() + + ", messageQueueWarningSize=" + cfg.messageQueueWarningSize() + ']'); + } + if (cfg.messageQueueLimit() == 0) U.quietAndWarn(log, "Message queue limit is set to 0 which may lead to " + "potential OOMEs when running cache operations in FULL_ASYNC or PRIMARY_SYNC modes " + diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/GridNioServerWrapper.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/GridNioServerWrapper.java index d35471fa52a3f..e04fe6decf654 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/GridNioServerWrapper.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/GridNioServerWrapper.java @@ -37,6 +37,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.function.Supplier; import javax.net.ssl.SSLEngine; @@ -130,6 +131,9 @@ public class GridNioServerWrapper { /** Default delay between reconnects attempts in case of temporary network issues. */ private static final int DFLT_RECONNECT_DELAY = 50; + /** Minimum frequency (in milliseconds) of high message queue size warning. */ + private static final long MIN_MSG_QUEUE_SIZE_WARN_FREQUENCY = 30_000L; + /** Channel meta used for establishing channel connections. */ static final int CHANNEL_FUT_META = GridNioSessionMetaKey.nextUniqueKey(); @@ -220,6 +224,9 @@ public class GridNioServerWrapper { /** Executor for establishing a connection to a node. */ private final TcpHandshakeExecutor tcpHandshakeExecutor; + /** Timestamp of the last high message queue size warning. */ + private final AtomicLong lastMsqQueueSizeWarningTs = new AtomicLong(); + /** * @param log Logger. * @param cfg Config. @@ -885,7 +892,9 @@ private MessageFactory get() { boolean clientMode = Boolean.TRUE.equals(igniteCfg.isClientMode()); IgniteBiInClosure queueSizeMonitor = - !clientMode && cfg.slowClientQueueLimit() > 0 ? this::checkClientQueueSize : null; + !clientMode && (cfg.slowClientQueueLimit() > 0 || cfg.messageQueueWarningSize() > 0) + ? cfg.messageQueueWarningSize() > 0 ? this::checkNodeQueueSize : this::checkClientQueueSize + : null; List filters = new ArrayList<>(); @@ -1250,6 +1259,33 @@ private void checkClientQueueSize(GridNioSession ses, int msgQueueSize) { } } + /** + * Checks node message queue size and produce warning if message queue size exceeds the configured threshold. + * + * @param ses Node communication session. + * @param msgQueueSize Message queue size. + */ + private void checkNodeQueueSize(GridNioSession ses, int msgQueueSize) { + if (cfg.messageQueueWarningSize() > 0 && msgQueueSize > cfg.messageQueueWarningSize()) { + long lastWarnTs = lastMsqQueueSizeWarningTs.get(); + + if (U.currentTimeMillis() > lastWarnTs + MIN_MSG_QUEUE_SIZE_WARN_FREQUENCY) { + if (lastMsqQueueSizeWarningTs.compareAndSet(lastWarnTs, U.currentTimeMillis())) { + ConnectionKey id = ses.meta(CONN_IDX_META); + if (id != null) { + String msg = "Outbound message queue size for node exceeded configured " + + "messageQueueWarningSize value, it may be caused by node failure or a network problems " + + "[node=" + id.nodeId() + ", msqQueueSize=" + msgQueueSize + ']'; + + log.warning(msg); + } + } + } + } + + checkClientQueueSize(ses, msgQueueSize); + } + /** * @param commWorker New recovery and idle clients handler. */ diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConfigInitializer.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConfigInitializer.java index 3aba3b1e7cddf..7ccde021dfb55 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConfigInitializer.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConfigInitializer.java @@ -690,6 +690,33 @@ public int getMessageQueueLimit() { return cfg.messageQueueLimit(); } + /** + * Sets message queue size to print warning. + *

+ * When set to positive number warning will be produced when outgoing message queue size exeeds provided size. + * {@code 0} disables the warning. + *

+ * If not provided, default is 0 (do not print warning). + * + * @param msgQueueWarnSize Outgoing messages queue size to print warning. + * @return {@code this} for chaining. + */ + @IgniteSpiConfiguration(optional = true) + public TcpCommunicationSpi setMessageQueueWarningSize(int msgQueueWarnSize) { + cfg.messageQueueWarningSize(msgQueueWarnSize); + + return (TcpCommunicationSpi)this; + } + + /** + * Gets outgoing messages queue size to print warning. + * + * @return Outgoing messages queue size to print warning. + */ + public int getMessageQueueWarningSize() { + return cfg.messageQueueWarningSize(); + } + /** * See {@link #setSlowClientQueueLimit(int)}. * diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConfiguration.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConfiguration.java index 0aa57af568735..ee64aa0706b3f 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConfiguration.java @@ -97,6 +97,9 @@ public class TcpCommunicationConfiguration implements Serializable { /** Message queue limit. */ private int msgQueueLimit = DFLT_MSG_QUEUE_LIMIT; + /** Message queue size to pring warning. */ + private int msgQueueWarnSize; + /** Use paired connections. */ private boolean usePairedConnections; @@ -333,6 +336,20 @@ public void messageQueueLimit(int msgQueueLimit) { this.msgQueueLimit = msgQueueLimit; } + /** + * @return Message queue size to print warning. + */ + public int messageQueueWarningSize() { + return msgQueueWarnSize; + } + + /** + * @param msgQueueWarnSize New message queue size to print warning. + */ + public void messageQueueWarningSize(int msgQueueWarnSize) { + this.msgQueueWarnSize = msgQueueWarnSize; + } + /** * @return Use paired connections. */ diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index e469d8572f31e..425cca97e4aa2 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -85,6 +85,8 @@ import org.apache.ignite.internal.managers.discovery.DiscoveryServerOnlyCustomMessage; import org.apache.ignite.internal.processors.configuration.distributed.DistributedBooleanProperty; import org.apache.ignite.internal.processors.failure.FailureProcessor; +import org.apache.ignite.internal.processors.metric.MetricRegistryImpl; +import org.apache.ignite.internal.processors.metric.impl.MaxValueMetric; import org.apache.ignite.internal.processors.security.SecurityContext; import org.apache.ignite.internal.processors.security.SecurityUtils; import org.apache.ignite.internal.processors.tracing.Span; @@ -181,6 +183,7 @@ import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MARSHALLER_USE_DFLT_SUID; import static org.apache.ignite.internal.cluster.DistributedConfigurationUtils.CONN_DISABLED_BY_ADMIN_ERR_MSG; import static org.apache.ignite.internal.cluster.DistributedConfigurationUtils.newConnectionEnabledProperty; +import static org.apache.ignite.internal.managers.discovery.GridDiscoveryManager.DISCO_METRICS; import static org.apache.ignite.internal.processors.security.SecurityUtils.authenticateLocalNode; import static org.apache.ignite.internal.processors.security.SecurityUtils.withSecurityContext; import static org.apache.ignite.internal.util.lang.ClusterNodeFunc.node2id; @@ -256,6 +259,9 @@ class ServerImpl extends TcpDiscoveryImpl { @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") private StatisticsPrinter statsPrinter; + /** Metric for max message queue size. */ + private MaxValueMetric maxMsgQueueSizeMetric; + /** Failed nodes (but still in topology). */ private final Map failedNodes = new HashMap<>(); @@ -491,6 +497,16 @@ class ServerImpl extends TcpDiscoveryImpl { /** {@inheritDoc} */ @Override public void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException { spiCtx.registerPort(tcpSrvr.port, TCP); + + MetricRegistryImpl discoReg = (MetricRegistryImpl)spiCtx.getOrCreateMetricRegistry(DISCO_METRICS); + + maxMsgQueueSizeMetric = discoReg.maxValueMetric("MaxMsgQueueSize", + "Max message queue size", 60_000L, 5); + + discoReg.register("Next", () -> { + TcpDiscoveryNode next = msgWorker != null ? msgWorker.next : null; + return next != null ? next.id() : null; + }, UUID.class, "Next in the ring node ID"); } /** {@inheritDoc} */ @@ -3102,6 +3118,9 @@ private void addToQueue(TcpDiscoveryAbstractMessage msg, boolean addFirst) { if (log.isDebugEnabled()) log.debug("Message has been added to a worker's queue: " + msg); } + + if (maxMsgQueueSizeMetric != null) + maxMsgQueueSizeMetric.update(queue.size()); } /** */ diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java index 4d3a4895f6e05..75f19e28c1cd8 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java @@ -30,14 +30,10 @@ import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; import org.apache.ignite.events.EventType; -import org.apache.ignite.internal.managers.communication.GridIoManager; import org.apache.ignite.internal.util.lang.GridAbsPredicate; -import org.apache.ignite.internal.util.nio.GridNioServer; -import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.plugin.segmentation.SegmentationPolicy; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; -import org.apache.ignite.spi.communication.tcp.internal.GridNioServerWrapper; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; @@ -154,13 +150,7 @@ public void testSlowClient() throws Exception { for (int i = 0; i < 100; i++) cache0.put(0, i); - GridIoManager ioMgr = slowClient.context().io(); - - TcpCommunicationSpi commSpi = (TcpCommunicationSpi)((Object[])U.field(ioMgr, "spis"))[0]; - - GridNioServer nioSrvr = ((GridNioServerWrapper)GridTestUtils.getFieldValue(commSpi, "nioSrvWrapper")).nio(); - - GridTestUtils.setFieldValue(nioSrvr, "skipRead", true); + GridTestUtils.skipCommNioServerRead(slowClient, true); // Initiate messages for client. for (int i = 0; i < 100; i++) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/metric/OutboundIoMessageQueueSizeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/metric/OutboundIoMessageQueueSizeTest.java new file mode 100644 index 0000000000000..795fa3246124a --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/metric/OutboundIoMessageQueueSizeTest.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.metric; + +import java.util.concurrent.CountDownLatch; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.managers.discovery.DiscoCache; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; +import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.metric.impl.MaxValueMetric; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.discovery.tcp.BlockTcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.ListeningTestLogger; +import org.apache.ignite.testframework.LogListener; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jetbrains.annotations.Nullable; +import org.junit.Test; + +import static org.apache.ignite.internal.managers.discovery.GridDiscoveryManager.DISCO_METRICS; +import static org.apache.ignite.internal.util.nio.GridNioServer.MAX_MESSAGES_QUEUE_SIZE_METRIC_NAME; +import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.COMMUNICATION_METRICS_GROUP_NAME; + +/** + * Test for discovery/communication outbound message queue size metrics. + */ +public class OutboundIoMessageQueueSizeTest extends GridCommonAbstractTest { + /** */ + private static final int MSG_LIMIT = 50; + + /** */ + private final ListeningTestLogger log = new ListeningTestLogger(); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + TcpCommunicationSpi commSpi = new TcpCommunicationSpi(); + + commSpi.setMessageQueueWarningSize(MSG_LIMIT); + + cfg.setDiscoverySpi(new BlockTcpDiscoverySpi().setIpFinder(((TcpDiscoverySpi)cfg.getDiscoverySpi()).getIpFinder())); + cfg.setCommunicationSpi(commSpi); + cfg.setGridLogger(log); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testCommunicationMsgQueue() throws Exception { + IgniteEx srv0 = startGrid(0); + IgniteEx srv1 = startGrid(1); + + String logMsg = "Outbound message queue size for node exceeded"; + + // Only one message to log should be printed due to throttling. + LogListener logLsnr = LogListener.matches(logMsg).times(1).build(); + + log.registerListener(logLsnr); + + IgniteCache cache0 = srv0.getOrCreateCache(DEFAULT_CACHE_NAME); + IgniteCache cache1 = srv1.getOrCreateCache(DEFAULT_CACHE_NAME); + + cache1.query(new ContinuousQuery<>().setLocalListener(evt -> {})); + + cache0.put(0, 0); + + assertFalse(logLsnr.check()); + + MaxValueMetric metric = srv0.context().metric().registry(COMMUNICATION_METRICS_GROUP_NAME) + .findMetric(MAX_MESSAGES_QUEUE_SIZE_METRIC_NAME); + + assertTrue(metric.value() < MSG_LIMIT); + + GridTestUtils.skipCommNioServerRead(srv1, true); + + // Initiate messages for srv1. + // Some messages still may be sent until buffers overflow, so use MSG_LIMIT * 2 messages. + for (int i = 0; i < MSG_LIMIT * 2; i++) + cache0.put(0, new byte[10 * 1024]); + + assertTrue(metric.value() >= MSG_LIMIT); + + assertTrue(logLsnr.check()); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testDiscoveryMsgQueue() throws Exception { + IgniteEx srv0 = startGrids(2); + + MaxValueMetric metric = srv0.context().metric().registry(DISCO_METRICS) + .findMetric("MaxMsgQueueSize"); + + metric.reset(); // Reset value accumulated before discovery SPI startup. + + srv0.context().discovery().sendCustomEvent(new DummyCustomDiscoveryMessage(IgniteUuid.randomUuid())); + + // Assume our message can be added to queue concurrently with other messages + // (for example, with metrics update message). + assertTrue(metric.value() < 3); + + BlockTcpDiscoverySpi discoverySpi = (BlockTcpDiscoverySpi)srv0.context().config().getDiscoverySpi(); + + CountDownLatch latch = new CountDownLatch(1); + + discoverySpi.setClosure((node, msg) -> { + U.awaitQuiet(latch); + + return null; + }); + + try { + for (int i = 0; i <= MSG_LIMIT; i++) + srv0.context().discovery().sendCustomEvent(new DummyCustomDiscoveryMessage(IgniteUuid.randomUuid())); + + assertTrue(metric.value() >= MSG_LIMIT); + } + finally { + latch.countDown(); + } + } + + /** */ + private static class DummyCustomDiscoveryMessage implements DiscoveryCustomMessage { + /** */ + private final IgniteUuid id; + + /** + * @param id Message id. + */ + DummyCustomDiscoveryMessage(IgniteUuid id) { + this.id = id; + } + + /** {@inheritDoc} */ + @Override public IgniteUuid id() { + return id; + } + + /** {@inheritDoc} */ + @Nullable @Override public DiscoveryCustomMessage ackMessage() { + return null; + } + + /** {@inheritDoc} */ + @Override public boolean isMutable() { + return false; + } + + /** {@inheritDoc} */ + @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr, AffinityTopologyVersion topVer, + DiscoCache discoCache) { + throw new UnsupportedOperationException(); + } + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ClientReconnectContinuousQueryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ClientReconnectContinuousQueryTest.java index 079869e812a3e..f3e788b654fb2 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ClientReconnectContinuousQueryTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ClientReconnectContinuousQueryTest.java @@ -31,12 +31,8 @@ import org.apache.ignite.events.EventType; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.managers.communication.GridIoManager; -import org.apache.ignite.internal.util.nio.GridNioServer; -import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; -import org.apache.ignite.spi.communication.tcp.internal.GridNioServerWrapper; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Test; @@ -122,13 +118,13 @@ public void testClientReconnect() throws Exception { assertTrue(updaterReceived.await(10_000, TimeUnit.MILLISECONDS)); - skipRead(client, true); + GridTestUtils.skipCommNioServerRead(client, true); IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() { @Override public Void call() throws Exception { assertTrue(disconLatch.await(10_000, TimeUnit.MILLISECONDS)); - skipRead(client, false); + GridTestUtils.skipCommNioServerRead(client, false); return null; } @@ -203,18 +199,4 @@ private void putSomeKeys(int cnt) { for (int i = 0; i < cnt; i++) srvCache.put(0, i); } - - /** - * @param igniteClient Ignite client. - * @param skip Skip. - */ - private void skipRead(IgniteEx igniteClient, boolean skip) { - GridIoManager ioMgr = igniteClient.context().io(); - - TcpCommunicationSpi commSpi = (TcpCommunicationSpi)((Object[])U.field(ioMgr, "spis"))[0]; - - GridNioServer nioSrvr = ((GridNioServerWrapper)U.field(commSpi, "nioSrvWrapper")).nio(); - - GridTestUtils.setFieldValue(nioSrvr, "skipRead", skip); - } } diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java index 1613fdb8fba0f..62e450ee10c39 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java @@ -112,6 +112,7 @@ import org.apache.ignite.internal.util.lang.IgnitePair; import org.apache.ignite.internal.util.lang.RunnableX; import org.apache.ignite.internal.util.lang.gridfunc.NoOpClosure; +import org.apache.ignite.internal.util.nio.GridNioServer; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.T2; @@ -122,6 +123,8 @@ import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.communication.tcp.internal.GridNioServerWrapper; import org.apache.ignite.spi.discovery.DiscoveryNotification; import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; import org.apache.ignite.spi.discovery.DiscoverySpiListener; @@ -2644,4 +2647,17 @@ public static long sleep_and_can_fail(long sleepMs, boolean fail) { public static void suppressException(RunnableX runnableX) { runnableX.run(); } + + /** + * @param ignite Ignite instance. + * @param skip Skip. + */ + @SuppressWarnings("deprecation") + public static void skipCommNioServerRead(IgniteEx ignite, boolean skip) { + TcpCommunicationSpi commSpi = (TcpCommunicationSpi)(ignite.context().config().getCommunicationSpi()); + + GridNioServer nioSrvr = ((GridNioServerWrapper)U.field(commSpi, "nioSrvWrapper")).nio(); + + setFieldValue(nioSrvr, "skipRead", skip); + } } diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite13.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite13.java index 8b9b8afe183db..c7c4200a0095a 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite13.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite13.java @@ -32,6 +32,7 @@ import org.apache.ignite.internal.metric.LogExporterSpiTest; import org.apache.ignite.internal.metric.MetricsConfigurationTest; import org.apache.ignite.internal.metric.MetricsSelfTest; +import org.apache.ignite.internal.metric.OutboundIoMessageQueueSizeTest; import org.apache.ignite.internal.metric.ReadMetricsOnNodeStartupTest; import org.apache.ignite.internal.metric.SystemMetricsTest; import org.apache.ignite.internal.metric.SystemViewCacheExpiryPolicyTest; @@ -81,6 +82,7 @@ public static List> suite(Collection ignoredTests) { GridTestUtils.addTestIfNeeded(suite, IoStatisticsCacheSelfTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, IoStatisticsSelfTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, IoStatisticsMetricsLocalMXBeanImplSelfTest.class, ignoredTests); + GridTestUtils.addTestIfNeeded(suite, OutboundIoMessageQueueSizeTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, MetricsSelfTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, SystemMetricsTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, CustomMetricsTest.class, ignoredTests); From 02fc62b4c5c38b160c70b36b5ff2374f0b14495c Mon Sep 17 00:00:00 2001 From: Aleksey Plekhanov Date: Tue, 23 Dec 2025 19:13:54 +0300 Subject: [PATCH 2/3] IGNITE-26209 Review comments fixed --- .../communication/tcp/internal/GridNioServerWrapper.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/GridNioServerWrapper.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/GridNioServerWrapper.java index e04fe6decf654..8fda03d8103ef 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/GridNioServerWrapper.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/GridNioServerWrapper.java @@ -1273,11 +1273,9 @@ private void checkNodeQueueSize(GridNioSession ses, int msgQueueSize) { if (lastMsqQueueSizeWarningTs.compareAndSet(lastWarnTs, U.currentTimeMillis())) { ConnectionKey id = ses.meta(CONN_IDX_META); if (id != null) { - String msg = "Outbound message queue size for node exceeded configured " + + log.warning("Outbound message queue size for node exceeded configured " + "messageQueueWarningSize value, it may be caused by node failure or a network problems " + - "[node=" + id.nodeId() + ", msqQueueSize=" + msgQueueSize + ']'; - - log.warning(msg); + "[node=" + id.nodeId() + ", msqQueueSize=" + msgQueueSize + ']'); } } } From f06ef1a60d5617ca399c3cc82ab22f84c2129d94 Mon Sep 17 00:00:00 2001 From: Aleksey Plekhanov Date: Fri, 26 Dec 2025 16:46:33 +0300 Subject: [PATCH 3/3] IGNITE-26209 Review comments fixed --- .../apache/ignite/IgniteSystemProperties.java | 9 +++++++ .../tcp/TcpCommunicationSpi.java | 6 ----- .../tcp/internal/GridNioServerWrapper.java | 12 ++++++--- .../TcpCommunicationConfigInitializer.java | 27 ------------------- .../TcpCommunicationConfiguration.java | 17 ------------ .../OutboundIoMessageQueueSizeTest.java | 9 +++---- 6 files changed, 20 insertions(+), 60 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index b8be84ddb73f2..1f8916124c740 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -1824,6 +1824,15 @@ public final class IgniteSystemProperties extends IgniteCommonsSystemProperties "when value IgniteConfiguration#getLocalHost is ip, for backward compatibility") public static final String IGNITE_TCP_COMM_SET_ATTR_HOST_NAMES = "IGNITE_TCP_COMM_SET_ATTR_HOST_NAMES"; + /** + * When set to positive number warning will be produced when outgoing message queue size of TCP communication SPI + * exeeds provided value. + * Default is {@code 0} (do not print warning). + */ + @SystemProperty(value = "When set to positive number warning will be produced when outgoing message queue size of " + + "TCP communication SPI exeeds provided value. Default is 0 (do not print warning).", type = Integer.class) + public static final String IGNITE_TCP_COMM_MSG_QUEUE_WARN_SIZE = "IGNITE_TCP_COMM_MSG_QUEUE_WARN_SIZE"; + /** * When above zero, prints tx key collisions once per interval. * Each transaction besides OPTIMISTIC SERIALIZABLE capture locks on all enlisted keys, for some reasons diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 73a15f33fd427..2beea5091d1a7 100755 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -751,12 +751,6 @@ private void dumpInfo(StringBuilder sb, UUID dstNodeId) { ", slowClientQueueLimit=" + cfg.slowClientQueueLimit() + ']'); } - if (cfg.messageQueueWarningSize() > 0 && cfg.messageQueueLimit() > 0 && cfg.messageQueueWarningSize() >= cfg.messageQueueLimit()) { - U.quietAndWarn(log, "Message queue warning size is set to a value greater than or equal to message " + - "queue limit (message queue warning size will have no effect) [msgQueueLimit=" + cfg.messageQueueLimit() + - ", messageQueueWarningSize=" + cfg.messageQueueWarningSize() + ']'); - } - if (cfg.messageQueueLimit() == 0) U.quietAndWarn(log, "Message queue limit is set to 0 which may lead to " + "potential OOMEs when running cache operations in FULL_ASYNC or PRIMARY_SYNC modes " + diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/GridNioServerWrapper.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/GridNioServerWrapper.java index 8fda03d8103ef..f299f542ede12 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/GridNioServerWrapper.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/GridNioServerWrapper.java @@ -206,9 +206,13 @@ public class GridNioServerWrapper { private volatile ThrowableSupplier socketChannelFactory = SocketChannel::open; /** Enable forcible node kill. */ - private boolean forcibleNodeKillEnabled = IgniteSystemProperties + private final boolean forcibleNodeKillEnabled = IgniteSystemProperties .getBoolean(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL); + /** Message queue size to print warning. */ + private final int msgQueueWarningSize = IgniteSystemProperties.getInteger( + IgniteSystemProperties.IGNITE_TCP_COMM_MSG_QUEUE_WARN_SIZE, 0); + /** NIO server. */ private GridNioServer nioSrv; @@ -892,8 +896,8 @@ private MessageFactory get() { boolean clientMode = Boolean.TRUE.equals(igniteCfg.isClientMode()); IgniteBiInClosure queueSizeMonitor = - !clientMode && (cfg.slowClientQueueLimit() > 0 || cfg.messageQueueWarningSize() > 0) - ? cfg.messageQueueWarningSize() > 0 ? this::checkNodeQueueSize : this::checkClientQueueSize + !clientMode && (cfg.slowClientQueueLimit() > 0 || msgQueueWarningSize > 0) + ? msgQueueWarningSize > 0 ? this::checkNodeQueueSize : this::checkClientQueueSize : null; List filters = new ArrayList<>(); @@ -1266,7 +1270,7 @@ private void checkClientQueueSize(GridNioSession ses, int msgQueueSize) { * @param msgQueueSize Message queue size. */ private void checkNodeQueueSize(GridNioSession ses, int msgQueueSize) { - if (cfg.messageQueueWarningSize() > 0 && msgQueueSize > cfg.messageQueueWarningSize()) { + if (msgQueueWarningSize > 0 && msgQueueSize > msgQueueWarningSize) { long lastWarnTs = lastMsqQueueSizeWarningTs.get(); if (U.currentTimeMillis() > lastWarnTs + MIN_MSG_QUEUE_SIZE_WARN_FREQUENCY) { diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConfigInitializer.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConfigInitializer.java index 7ccde021dfb55..3aba3b1e7cddf 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConfigInitializer.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConfigInitializer.java @@ -690,33 +690,6 @@ public int getMessageQueueLimit() { return cfg.messageQueueLimit(); } - /** - * Sets message queue size to print warning. - *

- * When set to positive number warning will be produced when outgoing message queue size exeeds provided size. - * {@code 0} disables the warning. - *

- * If not provided, default is 0 (do not print warning). - * - * @param msgQueueWarnSize Outgoing messages queue size to print warning. - * @return {@code this} for chaining. - */ - @IgniteSpiConfiguration(optional = true) - public TcpCommunicationSpi setMessageQueueWarningSize(int msgQueueWarnSize) { - cfg.messageQueueWarningSize(msgQueueWarnSize); - - return (TcpCommunicationSpi)this; - } - - /** - * Gets outgoing messages queue size to print warning. - * - * @return Outgoing messages queue size to print warning. - */ - public int getMessageQueueWarningSize() { - return cfg.messageQueueWarningSize(); - } - /** * See {@link #setSlowClientQueueLimit(int)}. * diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConfiguration.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConfiguration.java index ee64aa0706b3f..0aa57af568735 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConfiguration.java @@ -97,9 +97,6 @@ public class TcpCommunicationConfiguration implements Serializable { /** Message queue limit. */ private int msgQueueLimit = DFLT_MSG_QUEUE_LIMIT; - /** Message queue size to pring warning. */ - private int msgQueueWarnSize; - /** Use paired connections. */ private boolean usePairedConnections; @@ -336,20 +333,6 @@ public void messageQueueLimit(int msgQueueLimit) { this.msgQueueLimit = msgQueueLimit; } - /** - * @return Message queue size to print warning. - */ - public int messageQueueWarningSize() { - return msgQueueWarnSize; - } - - /** - * @param msgQueueWarnSize New message queue size to print warning. - */ - public void messageQueueWarningSize(int msgQueueWarnSize) { - this.msgQueueWarnSize = msgQueueWarnSize; - } - /** * @return Use paired connections. */ diff --git a/modules/core/src/test/java/org/apache/ignite/internal/metric/OutboundIoMessageQueueSizeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/metric/OutboundIoMessageQueueSizeTest.java index 795fa3246124a..28264179cef4c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/metric/OutboundIoMessageQueueSizeTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/metric/OutboundIoMessageQueueSizeTest.java @@ -19,6 +19,7 @@ import java.util.concurrent.CountDownLatch; import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cache.query.ContinuousQuery; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteEx; @@ -29,12 +30,12 @@ import org.apache.ignite.internal.processors.metric.impl.MaxValueMetric; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; -import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.spi.discovery.tcp.BlockTcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.ListeningTestLogger; import org.apache.ignite.testframework.LogListener; +import org.apache.ignite.testframework.junits.WithSystemProperty; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.jetbrains.annotations.Nullable; import org.junit.Test; @@ -57,12 +58,7 @@ public class OutboundIoMessageQueueSizeTest extends GridCommonAbstractTest { @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); - TcpCommunicationSpi commSpi = new TcpCommunicationSpi(); - - commSpi.setMessageQueueWarningSize(MSG_LIMIT); - cfg.setDiscoverySpi(new BlockTcpDiscoverySpi().setIpFinder(((TcpDiscoverySpi)cfg.getDiscoverySpi()).getIpFinder())); - cfg.setCommunicationSpi(commSpi); cfg.setGridLogger(log); return cfg; @@ -79,6 +75,7 @@ public class OutboundIoMessageQueueSizeTest extends GridCommonAbstractTest { * @throws Exception If failed. */ @Test + @WithSystemProperty(key = IgniteSystemProperties.IGNITE_TCP_COMM_MSG_QUEUE_WARN_SIZE, value = "" + MSG_LIMIT) public void testCommunicationMsgQueue() throws Exception { IgniteEx srv0 = startGrid(0); IgniteEx srv1 = startGrid(1);