From 8d07f0fb07b8b419beefbba0f9df8af32a9c6827 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Tue, 30 Dec 2025 19:40:11 +0300 Subject: [PATCH 1/5] raw --- .../discovery/DiscoveryMessageFactory.java | 13 + .../cluster/ClusterNodeMetrics.java | 6 +- .../cluster/NodeFullMetricsMessage.java | 36 +- .../ignite/spi/discovery/tcp/ClientImpl.java | 4 +- .../ignite/spi/discovery/tcp/ServerImpl.java | 36 +- .../spi/discovery/tcp/TcpDiscoveryImpl.java | 48 ++- .../TcpDiscoveryCacheMetricsMessage.java | 50 +++ .../TcpDiscoveryMetricsUpdateMessage.java | 319 +++++------------- .../TcpDiscoveryNodeFullMetricsMessage.java | 60 ++++ .../TcpDiscoveryNodeMetricsMessage.java | 50 +++ .../TcpDiscoveryNodesMetricsMapMessage.java | 57 ++++ .../cache/CacheMetricsCacheSizeTest.java | 11 +- 12 files changed, 406 insertions(+), 284 deletions(-) create mode 100644 modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCacheMetricsMessage.java create mode 100644 modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeFullMetricsMessage.java create mode 100644 modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeMetricsMessage.java create mode 100644 modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodesMetricsMapMessage.java diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java index 1c01592fd0dd5..b44e661b83900 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java @@ -20,6 +20,7 @@ import org.apache.ignite.internal.codegen.InetAddressMessageSerializer; import org.apache.ignite.internal.codegen.InetSocketAddressMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryAuthFailedMessageSerializer; +import org.apache.ignite.internal.codegen.TcpDiscoveryCacheMetricsMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryCheckFailedMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryClientPingRequestSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryClientPingResponseSerializer; @@ -29,6 +30,9 @@ import org.apache.ignite.internal.codegen.TcpDiscoveryHandshakeRequestSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryHandshakeResponseSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryLoopbackProblemMessageSerializer; +import org.apache.ignite.internal.codegen.TcpDiscoveryNodeFullMetricsMessageSerializer; +import org.apache.ignite.internal.codegen.TcpDiscoveryNodeMetricsMessageSerializer; +import org.apache.ignite.internal.codegen.TcpDiscoveryNodesMetricsMapMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryPingRequestSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryPingResponseSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryRingLatencyCheckMessageSerializer; @@ -37,6 +41,7 @@ import org.apache.ignite.spi.discovery.tcp.messages.InetAddressMessage; import org.apache.ignite.spi.discovery.tcp.messages.InetSocketAddressMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAuthFailedMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCacheMetricsMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCheckFailedMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingRequest; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingResponse; @@ -46,6 +51,9 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeRequest; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeResponse; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryLoopbackProblemMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFullMetricsMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeMetricsMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodesMetricsMapMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingRequest; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryRingLatencyCheckMessage; @@ -54,6 +62,11 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider { /** {@inheritDoc} */ @Override public void registerAll(MessageFactory factory) { + factory.register((short)-105, TcpDiscoveryNodeFullMetricsMessage::new, + new TcpDiscoveryNodeFullMetricsMessageSerializer()); + factory.register((short)-104, TcpDiscoveryNodesMetricsMapMessage::new, new TcpDiscoveryNodesMetricsMapMessageSerializer()); + factory.register((short)-103, TcpDiscoveryCacheMetricsMessage::new, new TcpDiscoveryCacheMetricsMessageSerializer()); + factory.register((short)-102, TcpDiscoveryNodeMetricsMessage::new, new TcpDiscoveryNodeMetricsMessageSerializer()); factory.register((short)-101, InetSocketAddressMessage::new, new InetSocketAddressMessageSerializer()); factory.register((short)-100, InetAddressMessage::new, new InetAddressMessageSerializer()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterNodeMetrics.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterNodeMetrics.java index a1ac2dc3c8c47..7bba244fc7fc6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterNodeMetrics.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterNodeMetrics.java @@ -46,11 +46,11 @@ class ClusterNodeMetrics { /** */ public ClusterNodeMetrics(NodeFullMetricsMessage msg) { - nodeMetrics = new ClusterMetricsSnapshot(msg.nodeMetricsMsg()); + nodeMetrics = new ClusterMetricsSnapshot(msg.nodeMetricsMessage()); - cacheMetrics = new HashMap<>(msg.cachesMetrics().size(), 1.0f); + cacheMetrics = new HashMap<>(msg.cachesMetricsMessages().size(), 1.0f); - msg.cachesMetrics().entrySet().forEach(e -> cacheMetrics.put(e.getKey(), new CacheMetricsSnapshot(e.getValue()))); + msg.cachesMetricsMessages().forEach((key, value) -> cacheMetrics.put(key, new CacheMetricsSnapshot(value))); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/NodeFullMetricsMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/NodeFullMetricsMessage.java index 2974ab93351a6..42707f22b91ba 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/NodeFullMetricsMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/NodeFullMetricsMessage.java @@ -27,17 +27,17 @@ import org.apache.ignite.plugin.extensions.communication.Message; /** Node compound metrics message. */ -public final class NodeFullMetricsMessage implements Message { +public class NodeFullMetricsMessage implements Message { /** */ public static final short TYPE_CODE = 138; /** Node metrics wrapper message. */ - @Order(0) + @Order(value = 0, method = "nodeMetricsMessage") private NodeMetricsMessage nodeMetricsMsg; /** Cache metrics wrapper message. */ - @Order(1) - private Map cachesMetrics; + @Order(value = 1, method = "cachesMetricsMessages") + private Map cachesMetricsMsgs; /** Empty constructor for {@link GridIoMessageFactory}. */ public NodeFullMetricsMessage() { @@ -46,30 +46,40 @@ public NodeFullMetricsMessage() { /** */ public NodeFullMetricsMessage(ClusterMetrics nodeMetrics, Map cacheMetrics) { - nodeMetricsMsg = new NodeMetricsMessage(nodeMetrics); + nodeMetricsMsg = createNodeMetricsMessage(nodeMetrics); - cachesMetrics = new HashMap<>(cacheMetrics.size(), 1.0f); + cachesMetricsMsgs = new HashMap<>(cacheMetrics.size(), 1.0f); - cacheMetrics.forEach((key, value) -> cachesMetrics.put(key, new CacheMetricsMessage(value))); + cacheMetrics.forEach((key, value) -> cachesMetricsMsgs.put(key, createCacheMetricsMessage(value))); } /** */ - public Map cachesMetrics() { - return cachesMetrics; + protected NodeMetricsMessage createNodeMetricsMessage(ClusterMetrics nodeMetrics) { + return new NodeMetricsMessage(nodeMetrics); } /** */ - public void cachesMetrics(Map cacheMetricsMsg) { - cachesMetrics = cacheMetricsMsg; + protected CacheMetricsMessage createCacheMetricsMessage(CacheMetrics cacheMetrics) { + return new CacheMetricsMessage(cacheMetrics); } /** */ - public NodeMetricsMessage nodeMetricsMsg() { + public Map cachesMetricsMessages() { + return cachesMetricsMsgs; + } + + /** */ + public void cachesMetricsMessages(Map cacheMetricsMsg) { + cachesMetricsMsgs = cacheMetricsMsg; + } + + /** */ + public NodeMetricsMessage nodeMetricsMessage() { return nodeMetricsMsg; } /** */ - public void nodeMetricsMsg(NodeMetricsMessage nodeMetricsMsg) { + public void nodeMetricsMessage(NodeMetricsMessage nodeMetricsMsg) { this.nodeMetricsMsg = nodeMetricsMsg; } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index 5ecbe893fb852..4a5d3aad7af99 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -2526,8 +2526,8 @@ private void processMetricsUpdateMessage(TcpDiscoveryMetricsUpdateMessage msg) { log.debug("Received metrics response: " + msg); } else { - if (msg.hasMetrics()) - processMsgCacheMetrics(msg, System.nanoTime()); + if (!F.isEmpty(msg.serversFullMetricsMessages())) + processCacheMetrics(msg, System.nanoTime()); } } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index aef67a57809d8..2f9e93f6a64c2 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -2208,8 +2208,7 @@ private boolean recordable(TcpDiscoveryAbstractMessage msg) { * @param nodeId Node ID. */ private static void removeMetrics(TcpDiscoveryMetricsUpdateMessage msg, UUID nodeId) { - msg.removeMetrics(nodeId); - msg.removeCacheMetrics(nodeId); + msg.removeServerMetrics(nodeId); } /** {@inheritDoc} */ @@ -6032,14 +6031,14 @@ private void processMetricsUpdateMessage(TcpDiscoveryMetricsUpdateMessage msg) { long tsNanos = System.nanoTime(); - if (spiStateCopy() == CONNECTED && msg.hasMetrics()) - processMsgCacheMetrics(msg, tsNanos); + if (spiStateCopy() == CONNECTED && !F.isEmpty(msg.serversFullMetricsMessages())) + processCacheMetrics(msg, tsNanos); if (sendMessageToRemotes(msg)) { if (laps == 0 && spiStateCopy() == CONNECTED) { // Message is on its first ring or just created on coordinator. - msg.setMetrics(locNodeId, spi.metricsProvider.metrics()); - msg.setCacheMetrics(locNodeId, spi.metricsProvider.cacheMetrics()); + msg.addServerMetrics(locNodeId, spi.metricsProvider.metrics()); + msg.addServerCacheMetrics(locNodeId, spi.metricsProvider.cacheMetrics()); for (Map.Entry e : clientMsgWorkers.entrySet()) { UUID nodeId = e.getKey(); @@ -6047,15 +6046,15 @@ private void processMetricsUpdateMessage(TcpDiscoveryMetricsUpdateMessage msg) { if (metrics != null) msg.setClientMetrics(locNodeId, nodeId, metrics); - - msg.addClientNodeId(nodeId); } } else { // Message is on its second ring. - removeMetrics(msg, locNodeId); + msg.removeServerMetrics(locNodeId); - Collection clientNodeIds = msg.clientNodeIds(); + Collection clientNodeIds = F.isEmpty(msg.connectedClientsMetricsMessages()) + ? Collections.emptySet() + : msg.connectedClientsMetricsMessages().keySet(); for (TcpDiscoveryNode clientNode : ring.clientNodes()) { if (clientNode.visible()) { @@ -6065,9 +6064,7 @@ private void processMetricsUpdateMessage(TcpDiscoveryMetricsUpdateMessage msg) { if (!clientNode.clientAliveTimeSet()) clientNode.clientAliveTime(spi.clientFailureDetectionTimeout()); - boolean aliveCheck = clientNode.isClientAlive(); - - if (!aliveCheck && isLocalNodeCoordinator()) { + if (!clientNode.isClientAlive() && isLocalNodeCoordinator()) { boolean failedNode; synchronized (mux) { @@ -8416,7 +8413,8 @@ else if (laps == 1) { private int passedLaps(TcpDiscoveryMetricsUpdateMessage msg) { UUID locNodeId = getLocalNodeId(); - boolean hasLocMetrics = hasMetrics(msg, locNodeId); + boolean hasLocMetrics = !F.isEmpty(msg.serversFullMetricsMessages()) + && msg.serversFullMetricsMessages().get(locNodeId) != null; if (locNodeId.equals(msg.creatorNodeId()) && !hasLocMetrics && msg.senderNodeId() != null) return 2; @@ -8425,15 +8423,5 @@ else if (msg.senderNodeId() == null || !hasLocMetrics) else return 1; } - - /** - * @param msg Metrics update message to check. - * @param nodeId Node ID for which the check should be performed. - * @return {@code True} is the message contains metrics of the node with the provided ID. - * {@code False} otherwise. - */ - private boolean hasMetrics(TcpDiscoveryMetricsUpdateMessage msg, UUID nodeId) { - return msg.hasMetrics(nodeId) || msg.hasCacheMetrics(nodeId); - } } } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java index f35f411479501..2e0d4c3d0c0b8 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java @@ -35,10 +35,14 @@ import org.apache.ignite.cache.CacheMetrics; import org.apache.ignite.cluster.ClusterMetrics; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.ClusterMetricsSnapshot; import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.cache.CacheMetricsSnapshot; +import org.apache.ignite.internal.processors.cluster.CacheMetricsMessage; +import org.apache.ignite.internal.processors.cluster.NodeMetricsMessage; import org.apache.ignite.internal.processors.tracing.NoopTracing; import org.apache.ignite.internal.processors.tracing.Tracing; -import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.U; @@ -49,6 +53,8 @@ import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMetricsUpdateMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFullMetricsMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodesMetricsMapMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryRingLatencyCheckMessage; import org.jetbrains.annotations.Nullable; @@ -415,27 +421,45 @@ protected void clearNodeSensitiveData(TcpDiscoveryNode node) { } /** */ - public void processMsgCacheMetrics(TcpDiscoveryMetricsUpdateMessage msg, long tsNanos) { - for (Map.Entry e : msg.metrics().entrySet()) { - UUID nodeId = e.getKey(); + public void processCacheMetrics(TcpDiscoveryMetricsUpdateMessage msg, long tsNanos) { + for (Map.Entry e : msg.serversFullMetricsMessages().entrySet()) { + UUID srvrId = e.getKey(); + Map cacheMetricsMsgs = e.getValue().cachesMetricsMessages(); + NodeMetricsMessage srvrMetricsMsg = e.getValue().nodeMetricsMessage(); - TcpDiscoveryMetricsUpdateMessage.MetricsSet metricsSet = e.getValue(); + assert srvrMetricsMsg != null; - Map cacheMetrics = msg.hasCacheMetrics(nodeId) ? - msg.cacheMetrics().get(nodeId) : Collections.emptyMap(); + Map cacheMetrics; - if (endTimeMetricsSizeProcessWait <= U.currentTimeMillis() - && cacheMetrics.size() >= METRICS_QNT_WARN) { + if (!F.isEmpty(cacheMetricsMsgs)) { + cacheMetrics = U.newHashMap(cacheMetricsMsgs.size()); + + cacheMetricsMsgs.forEach((cacheId, cacheMetricsMsg) -> + cacheMetrics.put(cacheId, new CacheMetricsSnapshot(cacheMetricsMsg))); + } + else + cacheMetrics = Collections.emptyMap(); + + if (endTimeMetricsSizeProcessWait <= U.currentTimeMillis() && cacheMetrics.size() >= METRICS_QNT_WARN) { log.warning("The Discovery message has metrics for " + cacheMetrics.size() + " caches.\n" + "To prevent Discovery blocking use -DIGNITE_DISCOVERY_DISABLE_CACHE_METRICS_UPDATE=true option."); endTimeMetricsSizeProcessWait = U.currentTimeMillis() + LOG_WARN_MSG_TIMEOUT; } - updateMetrics(nodeId, metricsSet.metrics(), cacheMetrics, tsNanos); + updateMetrics(srvrId, new ClusterMetricsSnapshot(srvrMetricsMsg), cacheMetrics, tsNanos); + + TcpDiscoveryNodesMetricsMapMessage clientsMetricsMsg = F.isEmpty(msg.connectedClientsMetricsMessages()) + ? null + : msg.connectedClientsMetricsMessages().get(srvrId); + + if (clientsMetricsMsg == null) + return; + + assert clientsMetricsMsg.nodesMetricsMessages() != null; - for (T2 t : metricsSet.clientMetrics()) - updateMetrics(t.get1(), t.get2(), cacheMetrics, tsNanos); + clientsMetricsMsg.nodesMetricsMessages().forEach((clientId, clientNodeMetricsMsg) -> + updateMetrics(clientId, new ClusterMetricsSnapshot(clientNodeMetricsMsg), cacheMetrics, tsNanos)); } } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCacheMetricsMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCacheMetricsMessage.java new file mode 100644 index 0000000000000..fc35b28bf0da6 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCacheMetricsMessage.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp.messages; + +import org.apache.ignite.cache.CacheMetrics; +import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; +import org.apache.ignite.internal.processors.cluster.CacheMetricsMessage; +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * We cannot directly reuse {@link CacheMetricsMessage} in Discovery as it is registered in a message factory of + * Communication component and thus is unavailable in Discovery. We have to extend {@link CacheMetricsMessage} and + * register this subclass in message factory of Discovery component. + */ +public class TcpDiscoveryCacheMetricsMessage extends CacheMetricsMessage { + /** Constructor for {@link DiscoveryMessageFactory}. */ + public TcpDiscoveryCacheMetricsMessage() { + // No-op. + } + + /** */ + public TcpDiscoveryCacheMetricsMessage(CacheMetrics m) { + super(m); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return -103; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TcpDiscoveryCacheMetricsMessage.class, this, "super", super.toString()); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryMetricsUpdateMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryMetricsUpdateMessage.java index e6835320fa5b2..7e81f9e433ac2 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryMetricsUpdateMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryMetricsUpdateMessage.java @@ -17,23 +17,16 @@ package org.apache.ignite.spi.discovery.tcp.messages; -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; -import java.util.HashSet; import java.util.Map; import java.util.UUID; import org.apache.ignite.cache.CacheMetrics; import org.apache.ignite.cluster.ClusterMetrics; -import org.apache.ignite.internal.ClusterMetricsSnapshot; +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; +import org.apache.ignite.internal.processors.cluster.CacheMetricsMessage; +import org.apache.ignite.internal.processors.cluster.NodeMetricsMessage; import org.apache.ignite.internal.util.tostring.GridToStringExclude; -import org.apache.ignite.internal.util.typedef.C1; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -56,16 +49,20 @@ public class TcpDiscoveryMetricsUpdateMessage extends TcpDiscoveryAbstractMessag /** */ private static final long serialVersionUID = 0L; - /** Map to store nodes metrics. */ + /** Connected clients metrics: server id -> map of clients metrics. */ @GridToStringExclude - private final Map metrics = new HashMap<>(); + @Order(value = 5, method = "connectedClientsMetricsMessages") + private Map connectedClientsMetricsMsgs; - /** Client node IDs. */ - private final Collection clientNodeIds = new HashSet<>(); - - /** Cahce metrics by node. */ + /** Servers full metrics: server id -> node metrics + node's caches metrics. */ @GridToStringExclude - private final Map> cacheMetrics = new HashMap<>(); + @Order(value = 7, method = "serversFullMetricsMessages") + private Map serversFullMetricsMsgs; + + /** Constructor for {@link DiscoveryMessageFactory}. */ + public TcpDiscoveryMetricsUpdateMessage() { + // No-op. + } /** * Constructor. @@ -77,141 +74,119 @@ public TcpDiscoveryMetricsUpdateMessage(UUID creatorNodeId) { } /** - * Sets metrics for particular node. + * Sets metrics for particular node. Supposed to be called before {@link #addServerCacheMetrics(UUID, Map)}. * - * @param nodeId Node ID. - * @param metrics Node metrics. + * @param srvrId Server ID. + * @param newMetrics New server metrics to add. */ - public void setMetrics(UUID nodeId, ClusterMetrics metrics) { - assert nodeId != null; - assert metrics != null; - assert !this.metrics.containsKey(nodeId); + public void addServerMetrics(UUID srvrId, ClusterMetrics newMetrics) { + assert srvrId != null; + assert newMetrics != null; - this.metrics.put(nodeId, new MetricsSet(metrics)); - } + if (serversFullMetricsMsgs == null) + serversFullMetricsMsgs = new HashMap<>(); - /** - * Sets cache metrics for particular node. - * - * @param nodeId Node ID. - * @param metrics Node cache metrics. - */ - public void setCacheMetrics(UUID nodeId, Map metrics) { - assert nodeId != null; - assert metrics != null; - assert !this.cacheMetrics.containsKey(nodeId); + assert !serversFullMetricsMsgs.containsKey(srvrId); - if (!F.isEmpty(metrics)) - this.cacheMetrics.put(nodeId, metrics); - } + serversFullMetricsMsgs.compute(srvrId, (srvrId0, srvrFullMetrics) -> { + if (srvrFullMetrics == null) + srvrFullMetrics = new TcpDiscoveryNodeFullMetricsMessage(); - /** - * Sets metrics for a client node. - * - * @param nodeId Server node ID. - * @param clientNodeId Client node ID. - * @param metrics Node metrics. - */ - public void setClientMetrics(UUID nodeId, UUID clientNodeId, ClusterMetrics metrics) { - assert nodeId != null; - assert clientNodeId != null; - assert metrics != null; - assert this.metrics.containsKey(nodeId); + srvrFullMetrics.nodeMetricsMessage(new NodeMetricsMessage(newMetrics)); - this.metrics.get(nodeId).addClientMetrics(clientNodeId, metrics); + return srvrFullMetrics; + }); } /** - * Removes metrics for particular node from the message. + * Sets cache metrics for particular node. Supposed to be called after {@link #addServerCacheMetrics(UUID, Map)}. * - * @param nodeId Node ID. + * @param srvrId Server ID. + * @param newCachesMetrics News server's caches metrics to add. */ - public void removeMetrics(UUID nodeId) { - assert nodeId != null; + public void addServerCacheMetrics(UUID srvrId, Map newCachesMetrics) { + assert srvrId != null; + assert newCachesMetrics != null; - metrics.remove(nodeId); - } + if (serversFullMetricsMsgs == null) + serversFullMetricsMsgs = new HashMap<>(); - /** - * Removes cache metrics for particular node from the message. - * - * @param nodeId Node ID. - */ - public void removeCacheMetrics(UUID nodeId) { - assert nodeId != null; + assert serversFullMetricsMsgs.containsKey(srvrId) && serversFullMetricsMsgs.get(srvrId).cachesMetricsMessages() == null; - cacheMetrics.remove(nodeId); - } + serversFullMetricsMsgs.compute(srvrId, (srvrId0, srvrFullMetrics) -> { + if (srvrFullMetrics == null) + srvrFullMetrics = new TcpDiscoveryNodeFullMetricsMessage(); - /** - * Gets metrics map. - * - * @return Metrics map. - */ - public Map metrics() { - return metrics; + Map newCachesMsgsMap = U.newHashMap(newCachesMetrics.size()); + + newCachesMetrics.forEach((cacheId, cacheMetrics) -> + newCachesMsgsMap.put(cacheId, new CacheMetricsMessage(cacheMetrics))); + + srvrFullMetrics.cachesMetricsMessages(newCachesMsgsMap); + + return srvrFullMetrics; + }); } /** - * Gets cache metrics map. + * Sets metrics for a connected client node. * - * @return Cache metrics map. + * @param srvrId Server node ID. + * @param clientNodeId Connected client node ID. + * @param clientMetrics Client metrics. */ - public Map> cacheMetrics() { - return cacheMetrics; - } + public void setClientMetrics(UUID srvrId, UUID clientNodeId, ClusterMetrics clientMetrics) { + assert srvrId != null; + assert clientNodeId != null; + assert clientMetrics != null; - /** - * @return {@code True} if this message contains metrics. - */ - public boolean hasMetrics() { - return !metrics.isEmpty(); - } + assert serversFullMetricsMsgs.containsKey(srvrId); - /** - * @return {@code True} this message contains cache metrics. - */ - public boolean hasCacheMetrics() { - return !cacheMetrics.isEmpty(); - } + assert !connectedClientsMetricsMsgs.containsKey(srvrId) + || connectedClientsMetricsMsgs.get(srvrId).nodesMetricsMessages().get(clientNodeId) == null; - /** - * @param nodeId Node ID. - * @return {@code True} if this message contains metrics. - */ - public boolean hasMetrics(UUID nodeId) { - assert nodeId != null; + connectedClientsMetricsMsgs.compute(srvrId, (srvrId0, clientsMetricsMsg) -> { + if (clientsMetricsMsg == null) { + clientsMetricsMsg = new TcpDiscoveryNodesMetricsMapMessage(); + clientsMetricsMsg.nodesMetricsMessages(new HashMap<>()); + } - return metrics.get(nodeId) != null; + clientsMetricsMsg.nodesMetricsMessages().put(clientNodeId, new TcpDiscoveryNodeMetricsMessage(clientMetrics)); + + return clientsMetricsMsg; + }); } /** - * @param nodeId Node ID. + * Removes metrics for particular server node from the message. * - * @return {@code True} if this message contains cache metrics for particular node. + * @param srvrId Server ID. */ - public boolean hasCacheMetrics(UUID nodeId) { - assert nodeId != null; + public void removeServerMetrics(UUID srvrId) { + assert srvrId != null; + assert serversFullMetricsMsgs != null; - return cacheMetrics.get(nodeId) != null; + serversFullMetricsMsgs.remove(srvrId); } - /** - * Gets client node IDs for particular node. - * - * @return Client node IDs. - */ - public Collection clientNodeIds() { - return clientNodeIds; + /** */ + public Map serversFullMetricsMessages() { + return serversFullMetricsMsgs; } - /** - * Adds client node ID. - * - * @param clientNodeId Client node ID. - */ - public void addClientNodeId(UUID clientNodeId) { - clientNodeIds.add(clientNodeId); + /** */ + public void serversFullMetricsMessages(Map serversFullMetricsMsgs) { + this.serversFullMetricsMsgs = serversFullMetricsMsgs; + } + + /** */ + public Map connectedClientsMetricsMessages() { + return connectedClientsMetricsMsgs; + } + + /** */ + public void connectedClientsMetricsMessages(Map connectedClientsMetricsMsgs) { + this.connectedClientsMetricsMsgs = connectedClientsMetricsMsgs; } /** {@inheritDoc} */ @@ -223,112 +198,4 @@ public void addClientNodeId(UUID clientNodeId) { @Override public String toString() { return S.toString(TcpDiscoveryMetricsUpdateMessage.class, this, "super", super.toString()); } - - /** - * @param nodeId Node ID. - * @param metrics Metrics. - * @return Serialized metrics. - */ - private static byte[] serializeMetrics(UUID nodeId, ClusterMetrics metrics) { - assert nodeId != null; - assert metrics != null; - - byte[] buf = new byte[16 + ClusterMetricsSnapshot.METRICS_SIZE]; - - U.longToBytes(nodeId.getMostSignificantBits(), buf, 0); - U.longToBytes(nodeId.getLeastSignificantBits(), buf, 8); - - ClusterMetricsSnapshot.serialize(buf, 16, metrics); - - return buf; - } - - /** - */ - @SuppressWarnings("PublicInnerClass") - public static class MetricsSet implements Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Metrics. */ - private byte[] metrics; - - /** Client metrics. */ - private Collection clientMetrics; - - /** - */ - public MetricsSet() { - // No-op. - } - - /** - * @param metrics Metrics. - */ - public MetricsSet(ClusterMetrics metrics) { - assert metrics != null; - - this.metrics = ClusterMetricsSnapshot.serialize(metrics); - } - - /** - * @return Deserialized metrics. - */ - public ClusterMetrics metrics() { - return ClusterMetricsSnapshot.deserialize(metrics, 0); - } - - /** - * @return Client metrics. - */ - public Collection> clientMetrics() { - return F.viewReadOnly(clientMetrics, new C1>() { - @Override public T2 apply(byte[] bytes) { - UUID nodeId = new UUID(U.bytesToLong(bytes, 0), U.bytesToLong(bytes, 8)); - - return new T2<>(nodeId, ClusterMetricsSnapshot.deserialize(bytes, 16)); - } - }); - } - - /** - * @param nodeId Client node ID. - * @param metrics Client metrics. - */ - private void addClientMetrics(UUID nodeId, ClusterMetrics metrics) { - assert nodeId != null; - assert metrics != null; - - if (clientMetrics == null) - clientMetrics = new ArrayList<>(); - - clientMetrics.add(serializeMetrics(nodeId, metrics)); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeByteArray(out, metrics); - - out.writeInt(clientMetrics != null ? clientMetrics.size() : -1); - - if (clientMetrics != null) { - for (byte[] arr : clientMetrics) - U.writeByteArray(out, arr); - } - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - metrics = U.readByteArray(in); - - int clientMetricsSize = in.readInt(); - - if (clientMetricsSize >= 0) { - clientMetrics = new ArrayList<>(clientMetricsSize); - - for (int i = 0; i < clientMetricsSize; i++) - clientMetrics.add(U.readByteArray(in)); - } - } - } } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeFullMetricsMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeFullMetricsMessage.java new file mode 100644 index 0000000000000..9a18945e54d10 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeFullMetricsMessage.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp.messages; + +import java.util.HashMap; +import java.util.Map; +import org.apache.ignite.cache.CacheMetrics; +import org.apache.ignite.cluster.ClusterMetrics; +import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; +import org.apache.ignite.internal.processors.cluster.CacheMetricsMessage; +import org.apache.ignite.internal.processors.cluster.NodeFullMetricsMessage; +import org.apache.ignite.internal.processors.cluster.NodeMetricsMessage; +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * We cannot directly reuse {@link NodeFullMetricsMessage} in Discovery as it is registered in a message factory of + * Communication component and thus is unavailable in Discovery. We have to extend {@link NodeFullMetricsMessage} and + * register this subclass in message factory of Discovery component. + */ +public class TcpDiscoveryNodeFullMetricsMessage extends NodeFullMetricsMessage { + /** Constructor for {@link DiscoveryMessageFactory}. */ + public TcpDiscoveryNodeFullMetricsMessage() { + // No-op. + } + + /** {@inheritDoc} */ + @Override protected NodeMetricsMessage createNodeMetricsMessage(ClusterMetrics nodeMetrics) { + return new TcpDiscoveryNodeMetricsMessage(nodeMetrics); + } + + /** {@inheritDoc} */ + @Override protected CacheMetricsMessage createCacheMetricsMessage(CacheMetrics cacheMetrics) { + return new TcpDiscoveryCacheMetricsMessage(cacheMetrics); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return -105; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TcpDiscoveryNodeFullMetricsMessage.class, this, "super", super.toString()); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeMetricsMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeMetricsMessage.java new file mode 100644 index 0000000000000..66590ed53fa5d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeMetricsMessage.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp.messages; + +import org.apache.ignite.cluster.ClusterMetrics; +import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; +import org.apache.ignite.internal.processors.cluster.NodeMetricsMessage; +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * We cannot directly reuse {@link NodeMetricsMessage} in Discovery as it is registered in a message factory of + * Communication component and thus is unavailable in Discovery. We have to extend {@link NodeMetricsMessage} and + * register this subclass in message factory of Discovery component. + */ +public class TcpDiscoveryNodeMetricsMessage extends NodeMetricsMessage { + /** Constructor for {@link DiscoveryMessageFactory}. */ + public TcpDiscoveryNodeMetricsMessage() { + // No-op. + } + + /** */ + public TcpDiscoveryNodeMetricsMessage(ClusterMetrics metrics) { + super(metrics); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return -102; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TcpDiscoveryNodeMetricsMessage.class, this, "super", super.toString()); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodesMetricsMapMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodesMetricsMapMessage.java new file mode 100644 index 0000000000000..81541f769d430 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodesMetricsMapMessage.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp.messages; + +import java.util.Map; +import java.util.UUID; +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; + +/** Several nodes metrics message. */ +public class TcpDiscoveryNodesMetricsMapMessage implements Message { + /** */ + @Order(value = 0, method = "nodesMetricsMessages") + private Map nodesMetricsMsgs; + + /** Constructor for {@link DiscoveryMessageFactory}. */ + public TcpDiscoveryNodesMetricsMapMessage() { + // No-op. + } + + /** */ + public Map nodesMetricsMessages() { + return nodesMetricsMsgs; + } + + /** */ + public void nodesMetricsMessages(Map nodesMetricsMsgs) { + this.nodesMetricsMsgs = nodesMetricsMsgs; + } + + /** {@inheritDoc} */ + @Override public short directType() { + return -104; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TcpDiscoveryNodesMetricsMapMessage.class, this, "super", super.toString()); + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsCacheSizeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsCacheSizeTest.java index d21204e99e1f2..b084c5f4517ea 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsCacheSizeTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsCacheSizeTest.java @@ -27,6 +27,7 @@ import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.processors.cluster.CacheMetricsMessage; import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMetricsUpdateMessage; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; @@ -92,7 +93,7 @@ public void testCacheSize() throws Exception { TcpDiscoveryMetricsUpdateMessage msg = new TcpDiscoveryMetricsUpdateMessage(UUID.randomUUID()); - msg.setCacheMetrics(UUID.randomUUID(), cacheMetrics); + msg.addServerCacheMetrics(UUID.randomUUID(), cacheMetrics); Marshaller marshaller = marshaller(grid(0)); @@ -104,11 +105,13 @@ public void testCacheSize() throws Exception { TcpDiscoveryMetricsUpdateMessage msg2 = (TcpDiscoveryMetricsUpdateMessage)readObj; - Map cacheMetrics2 = msg2.cacheMetrics().values().iterator().next(); + Map cacheMetrics2 = msg2.serversFullMetricsMessages().values().iterator().next() + .cachesMetricsMessages(); - CacheMetrics cacheMetric2 = cacheMetrics2.values().iterator().next(); + CacheMetrics cacheMetric2 = new CacheMetricsSnapshot(cacheMetrics2.values().iterator().next()); - assertEquals("TcpDiscoveryMetricsUpdateMessage serialization error, cacheSize is different", size, cacheMetric2.getCacheSize()); + assertEquals("TcpDiscoveryMetricsUpdateMessage serialization error, cacheSize is different", size, + cacheMetric2.getCacheSize()); IgniteCache cacheNode1 = grid(1).cache(DEFAULT_CACHE_NAME); From 90c54589ee0b2e1cd349a2f570fddf2284567c59 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Tue, 30 Dec 2025 23:00:27 +0300 Subject: [PATCH 2/5] minor --- .../tcp/messages/TcpDiscoveryMetricsUpdateMessage.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryMetricsUpdateMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryMetricsUpdateMessage.java index 7e81f9e433ac2..50ed4975f5b2e 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryMetricsUpdateMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryMetricsUpdateMessage.java @@ -29,6 +29,7 @@ import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; /** * Metrics update message. @@ -57,7 +58,7 @@ public class TcpDiscoveryMetricsUpdateMessage extends TcpDiscoveryAbstractMessag /** Servers full metrics: server id -> node metrics + node's caches metrics. */ @GridToStringExclude @Order(value = 7, method = "serversFullMetricsMessages") - private Map serversFullMetricsMsgs; + private @Nullable Map serversFullMetricsMsgs; /** Constructor for {@link DiscoveryMessageFactory}. */ public TcpDiscoveryMetricsUpdateMessage() { @@ -180,7 +181,7 @@ public void serversFullMetricsMessages(Map connectedClientsMetricsMessages() { + public @Nullable Map connectedClientsMetricsMessages() { return connectedClientsMetricsMsgs; } From 30a9f17ce553ba466825cd9e81eb43d52a87bd96 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Wed, 31 Dec 2025 11:45:58 +0300 Subject: [PATCH 3/5] codestyle --- .../tcp/messages/TcpDiscoveryCacheMetricsMessage.java | 6 +++--- .../tcp/messages/TcpDiscoveryMetricsUpdateMessage.java | 8 ++++---- .../tcp/messages/TcpDiscoveryNodeFullMetricsMessage.java | 2 -- .../tcp/messages/TcpDiscoveryNodeMetricsMessage.java | 6 +++--- .../tcp/messages/TcpDiscoveryNodesMetricsMapMessage.java | 4 ++-- 5 files changed, 12 insertions(+), 14 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCacheMetricsMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCacheMetricsMessage.java index fc35b28bf0da6..3fbfc2faa7164 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCacheMetricsMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCacheMetricsMessage.java @@ -33,9 +33,9 @@ public TcpDiscoveryCacheMetricsMessage() { // No-op. } - /** */ - public TcpDiscoveryCacheMetricsMessage(CacheMetrics m) { - super(m); + /** @param cacheMetricsMsg Cache metric message. */ + public TcpDiscoveryCacheMetricsMessage(CacheMetrics cacheMetricsMsg) { + super(cacheMetricsMsg); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryMetricsUpdateMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryMetricsUpdateMessage.java index 50ed4975f5b2e..172bcb5659618 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryMetricsUpdateMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryMetricsUpdateMessage.java @@ -170,22 +170,22 @@ public void removeServerMetrics(UUID srvrId) { serversFullMetricsMsgs.remove(srvrId); } - /** */ + /** @return Map of server full metrics messages. */ public Map serversFullMetricsMessages() { return serversFullMetricsMsgs; } - /** */ + /** @param serversFullMetricsMsgs Map of server full metrics messages. */ public void serversFullMetricsMessages(Map serversFullMetricsMsgs) { this.serversFullMetricsMsgs = serversFullMetricsMsgs; } - /** */ + /** @return Map of nodes metrics messages. */ public @Nullable Map connectedClientsMetricsMessages() { return connectedClientsMetricsMsgs; } - /** */ + /** @param connectedClientsMetricsMsgs Map of nodes metrics messages. */ public void connectedClientsMetricsMessages(Map connectedClientsMetricsMsgs) { this.connectedClientsMetricsMsgs = connectedClientsMetricsMsgs; } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeFullMetricsMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeFullMetricsMessage.java index 9a18945e54d10..1bfa6cdd7cf0a 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeFullMetricsMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeFullMetricsMessage.java @@ -17,8 +17,6 @@ package org.apache.ignite.spi.discovery.tcp.messages; -import java.util.HashMap; -import java.util.Map; import org.apache.ignite.cache.CacheMetrics; import org.apache.ignite.cluster.ClusterMetrics; import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeMetricsMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeMetricsMessage.java index 66590ed53fa5d..f62b3df08728f 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeMetricsMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeMetricsMessage.java @@ -33,9 +33,9 @@ public TcpDiscoveryNodeMetricsMessage() { // No-op. } - /** */ - public TcpDiscoveryNodeMetricsMessage(ClusterMetrics metrics) { - super(metrics); + /** @param nodeMetrics Node metrics. */ + public TcpDiscoveryNodeMetricsMessage(ClusterMetrics nodeMetrics) { + super(nodeMetrics); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodesMetricsMapMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodesMetricsMapMessage.java index 81541f769d430..fa774545a47be 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodesMetricsMapMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodesMetricsMapMessage.java @@ -35,12 +35,12 @@ public TcpDiscoveryNodesMetricsMapMessage() { // No-op. } - /** */ + /** @return Map of nodes metrics. */ public Map nodesMetricsMessages() { return nodesMetricsMsgs; } - /** */ + /** @param nodesMetricsMsgs Map of nodes metrics. */ public void nodesMetricsMessages(Map nodesMetricsMsgs) { this.nodesMetricsMsgs = nodesMetricsMsgs; } From 64f506d556c74a5d538345b5d21fec1f2c0a0a22 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Sat, 3 Jan 2026 01:59:32 +0300 Subject: [PATCH 4/5] message reading fix --- .../discovery/tcp/TcpDiscoveryIoSession.java | 49 ++++++++++++++----- 1 file changed, 38 insertions(+), 11 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java index 5c1946f0eac34..edcab473b4c91 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java @@ -170,13 +170,12 @@ T readMessage() throws IgniteCheckedException, IOException { if (MESSAGE_SERIALIZATION != serMode) { detectSslAlert(serMode, in); - throw new IgniteCheckedException("Received unexpected byte while reading discovery message: " + serMode); + // IOException type is important for ServerImpl. It may search the cause (X.hasCause). + // The connection error processing behavior depends on it. + throw new IOException("Received unexpected byte while reading discovery message: " + serMode); } - byte b0 = (byte)in.read(); - byte b1 = (byte)in.read(); - - Message msg = spi.messageFactory().create(makeMessageType(b0, b1)); + Message msg = spi.messageFactory().create(makeMessageType((byte)in.read(), (byte)in.read())); msgReader.reset(); msgReader.setBuffer(msgBuf); @@ -185,19 +184,47 @@ T readMessage() throws IgniteCheckedException, IOException { boolean finished; - do { - // Should be cleared before first operation. - msgBuf.clear(); + msgBuf.clear(); - int read = in.read(msgBuf.array(), 0, msgBuf.limit()); + do { + int read = in.read(msgBuf.array(), msgBuf.position(), msgBuf.remaining()); if (read == -1) throw new EOFException("Connection closed before message was fully read."); - msgBuf.limit(read); + if (msgBuf.position() > 0) { + msgBuf.limit(msgBuf.position() + read); + + // We've stored an unprocessed tail before. + msgBuf.rewind(); + } + else + msgBuf.limit(read); finished = msgSer.readFrom(msg, msgReader); - } while (!finished); + + // We rely on the fact that Discovery only sends next message upon receiving a receipt for the previous one. + // This behaviour guarantees that we never read a next message from the buffer right after the end of + // the previous message. + assert msgBuf.remaining() == 0 || !finished : "Some data was read from the socket but left unprocessed."; + + if (finished) + break; + + // We must keep the uprocessed bytes read from the socket. It won't return them again. + byte[] unprocessedTail = null; + + if (msgBuf.remaining() > 0) { + unprocessedTail = new byte[msgBuf.remaining()]; + msgBuf.get(unprocessedTail, 0, msgBuf.remaining()); + } + + msgBuf.clear(); + + if (unprocessedTail != null) + msgBuf.put(unprocessedTail); + } + while (true); return (T)msg; } From 280619a2fd2e721912b2dfdd7c00cd3b39ce2b93 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Fri, 9 Jan 2026 13:12:55 +0300 Subject: [PATCH 5/5] fixes --- .../discovery/DiscoveryMessageFactory.java | 3 +++ .../cluster/NodeFullMetricsMessage.java | 20 ++++---------- .../ignite/spi/discovery/tcp/ClientImpl.java | 2 +- .../ignite/spi/discovery/tcp/ServerImpl.java | 9 ++++--- .../spi/discovery/tcp/TcpDiscoveryImpl.java | 2 +- .../TcpDiscoveryMetricsUpdateMessage.java | 26 ++++++++++++------- .../TcpDiscoveryNodeFullMetricsMessage.java | 14 ---------- .../TcpDiscoveryNodesMetricsMapMessage.java | 8 +++--- 8 files changed, 35 insertions(+), 49 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java index b44e661b83900..dc176cd4540fd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java @@ -30,6 +30,7 @@ import org.apache.ignite.internal.codegen.TcpDiscoveryHandshakeRequestSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryHandshakeResponseSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryLoopbackProblemMessageSerializer; +import org.apache.ignite.internal.codegen.TcpDiscoveryMetricsUpdateMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryNodeFullMetricsMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryNodeMetricsMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryNodesMetricsMapMessageSerializer; @@ -51,6 +52,7 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeRequest; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeResponse; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryLoopbackProblemMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMetricsUpdateMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFullMetricsMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeMetricsMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodesMetricsMapMessage; @@ -83,5 +85,6 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider { factory.register((short)10, TcpDiscoveryHandshakeResponse::new, new TcpDiscoveryHandshakeResponseSerializer()); factory.register((short)11, TcpDiscoveryAuthFailedMessage::new, new TcpDiscoveryAuthFailedMessageSerializer()); factory.register((short)12, TcpDiscoveryDuplicateIdMessage::new, new TcpDiscoveryDuplicateIdMessageSerializer()); + factory.register((short)13, TcpDiscoveryMetricsUpdateMessage::new, new TcpDiscoveryMetricsUpdateMessageSerializer()); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/NodeFullMetricsMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/NodeFullMetricsMessage.java index 42707f22b91ba..0ca3ee6800f6d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/NodeFullMetricsMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/NodeFullMetricsMessage.java @@ -17,13 +17,13 @@ package org.apache.ignite.internal.processors.cluster; -import java.util.HashMap; import java.util.Map; import org.apache.ignite.cache.CacheMetrics; import org.apache.ignite.cluster.ClusterMetrics; import org.apache.ignite.internal.Order; import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.plugin.extensions.communication.Message; /** Node compound metrics message. */ @@ -41,26 +41,16 @@ public class NodeFullMetricsMessage implements Message { /** Empty constructor for {@link GridIoMessageFactory}. */ public NodeFullMetricsMessage() { - + // No-op. } /** */ public NodeFullMetricsMessage(ClusterMetrics nodeMetrics, Map cacheMetrics) { - nodeMetricsMsg = createNodeMetricsMessage(nodeMetrics); - - cachesMetricsMsgs = new HashMap<>(cacheMetrics.size(), 1.0f); + nodeMetricsMsg = new NodeMetricsMessage(nodeMetrics); - cacheMetrics.forEach((key, value) -> cachesMetricsMsgs.put(key, createCacheMetricsMessage(value))); - } + cachesMetricsMsgs = U.newHashMap(cacheMetrics.size()); - /** */ - protected NodeMetricsMessage createNodeMetricsMessage(ClusterMetrics nodeMetrics) { - return new NodeMetricsMessage(nodeMetrics); - } - - /** */ - protected CacheMetricsMessage createCacheMetricsMessage(CacheMetrics cacheMetrics) { - return new CacheMetricsMessage(cacheMetrics); + cacheMetrics.forEach((key, value) -> cachesMetricsMsgs.put(key, new CacheMetricsMessage(value))); } /** */ diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index 4a5d3aad7af99..238fdd13eabd5 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -2527,7 +2527,7 @@ private void processMetricsUpdateMessage(TcpDiscoveryMetricsUpdateMessage msg) { } else { if (!F.isEmpty(msg.serversFullMetricsMessages())) - processCacheMetrics(msg, System.nanoTime()); + processCacheMetricsMessage(msg, System.nanoTime()); } } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 2f9e93f6a64c2..12b58668ca4da 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -3378,10 +3378,11 @@ private void sendMessageToClients(TcpDiscoveryAbstractMessage msg) { for (ClientMessageWorker clientMsgWorker : clientMsgWorkers.values()) { if (msgBytes == null) { try { - msgBytes = U.marshal(spi.marshaller(), msg); + msgBytes = clientMsgWorker.ses.serializeMessage(msg); } - catch (IgniteCheckedException e) { - U.error(log, "Failed to marshal message: " + msg, e); + catch (IgniteCheckedException | IOException e) { + U.error(log, "Failed to serialize message to a client: " + msg + ", client id: " + + clientMsgWorker.clientNodeId, e); break; } @@ -6032,7 +6033,7 @@ private void processMetricsUpdateMessage(TcpDiscoveryMetricsUpdateMessage msg) { long tsNanos = System.nanoTime(); if (spiStateCopy() == CONNECTED && !F.isEmpty(msg.serversFullMetricsMessages())) - processCacheMetrics(msg, tsNanos); + processCacheMetricsMessage(msg, tsNanos); if (sendMessageToRemotes(msg)) { if (laps == 0 && spiStateCopy() == CONNECTED) { diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java index 2e0d4c3d0c0b8..80b7879854606 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java @@ -421,7 +421,7 @@ protected void clearNodeSensitiveData(TcpDiscoveryNode node) { } /** */ - public void processCacheMetrics(TcpDiscoveryMetricsUpdateMessage msg, long tsNanos) { + public void processCacheMetricsMessage(TcpDiscoveryMetricsUpdateMessage msg, long tsNanos) { for (Map.Entry e : msg.serversFullMetricsMessages().entrySet()) { UUID srvrId = e.getKey(); Map cacheMetricsMsgs = e.getValue().cachesMetricsMessages(); diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryMetricsUpdateMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryMetricsUpdateMessage.java index 172bcb5659618..37787cc5e905b 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryMetricsUpdateMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryMetricsUpdateMessage.java @@ -25,10 +25,10 @@ import org.apache.ignite.internal.Order; import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; import org.apache.ignite.internal.processors.cluster.CacheMetricsMessage; -import org.apache.ignite.internal.processors.cluster.NodeMetricsMessage; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.plugin.extensions.communication.Message; import org.jetbrains.annotations.Nullable; /** @@ -46,18 +46,18 @@ * second pass). */ @TcpDiscoveryRedirectToClient -public class TcpDiscoveryMetricsUpdateMessage extends TcpDiscoveryAbstractMessage { +public class TcpDiscoveryMetricsUpdateMessage extends TcpDiscoveryAbstractMessage implements Message { /** */ private static final long serialVersionUID = 0L; - /** Connected clients metrics: server id -> map of clients metrics. */ + /** Connected clients metrics: server id -> client id -> clients metrics. */ @GridToStringExclude @Order(value = 5, method = "connectedClientsMetricsMessages") private Map connectedClientsMetricsMsgs; - /** Servers full metrics: server id -> node metrics + node's caches metrics. */ + /** Servers full metrics: server id -> server metrics + metrics of server's caches. */ @GridToStringExclude - @Order(value = 7, method = "serversFullMetricsMessages") + @Order(value = 6, method = "serversFullMetricsMessages") private @Nullable Map serversFullMetricsMsgs; /** Constructor for {@link DiscoveryMessageFactory}. */ @@ -75,7 +75,7 @@ public TcpDiscoveryMetricsUpdateMessage(UUID creatorNodeId) { } /** - * Sets metrics for particular node. Supposed to be called before {@link #addServerCacheMetrics(UUID, Map)}. + * Sets metrics for particular node. * * @param srvrId Server ID. * @param newMetrics New server metrics to add. @@ -93,14 +93,14 @@ public void addServerMetrics(UUID srvrId, ClusterMetrics newMetrics) { if (srvrFullMetrics == null) srvrFullMetrics = new TcpDiscoveryNodeFullMetricsMessage(); - srvrFullMetrics.nodeMetricsMessage(new NodeMetricsMessage(newMetrics)); + srvrFullMetrics.nodeMetricsMessage(new TcpDiscoveryNodeMetricsMessage(newMetrics)); return srvrFullMetrics; }); } /** - * Sets cache metrics for particular node. Supposed to be called after {@link #addServerCacheMetrics(UUID, Map)}. + * Sets cache metrics for particular node. * * @param srvrId Server ID. * @param newCachesMetrics News server's caches metrics to add. @@ -121,7 +121,7 @@ public void addServerCacheMetrics(UUID srvrId, Map newCac Map newCachesMsgsMap = U.newHashMap(newCachesMetrics.size()); newCachesMetrics.forEach((cacheId, cacheMetrics) -> - newCachesMsgsMap.put(cacheId, new CacheMetricsMessage(cacheMetrics))); + newCachesMsgsMap.put(cacheId, new TcpDiscoveryCacheMetricsMessage(cacheMetrics))); srvrFullMetrics.cachesMetricsMessages(newCachesMsgsMap); @@ -141,7 +141,8 @@ public void setClientMetrics(UUID srvrId, UUID clientNodeId, ClusterMetrics clie assert clientNodeId != null; assert clientMetrics != null; - assert serversFullMetricsMsgs.containsKey(srvrId); + if (connectedClientsMetricsMsgs == null) + connectedClientsMetricsMsgs = new HashMap<>(); assert !connectedClientsMetricsMsgs.containsKey(srvrId) || connectedClientsMetricsMsgs.get(srvrId).nodesMetricsMessages().get(clientNodeId) == null; @@ -195,6 +196,11 @@ public void connectedClientsMetricsMessages(Map nodesMetricsMsgs; @@ -35,12 +35,12 @@ public TcpDiscoveryNodesMetricsMapMessage() { // No-op. } - /** @return Map of nodes metrics. */ + /** @return Map of nodes metrics messages per node id. */ public Map nodesMetricsMessages() { return nodesMetricsMsgs; } - /** @param nodesMetricsMsgs Map of nodes metrics. */ + /** @param nodesMetricsMsgs Map of nodes metrics messages per node id. */ public void nodesMetricsMessages(Map nodesMetricsMsgs) { this.nodesMetricsMsgs = nodesMetricsMsgs; }