From 6926c1b9045a4194a851e8b305e8ea4c4e341093 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Mon, 22 Dec 2025 15:04:03 +0300 Subject: [PATCH 01/15] research --- .../discovery/DiscoveryMessageFactory.java | 8 +++ .../ignite/spi/discovery/tcp/ServerImpl.java | 3 +- ...cpDiscoveryClientMetricsUpdateMessage.java | 38 +++++++++++--- ...pDiscoveryClusterMetricsHolderMessage.java | 49 +++++++++++++++++++ 4 files changed, 89 insertions(+), 9 deletions(-) create mode 100644 modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClusterMetricsHolderMessage.java diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java index c64315bc448cf..e1e29da4ca98a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java @@ -18,8 +18,10 @@ package org.apache.ignite.internal.managers.discovery; import org.apache.ignite.internal.codegen.TcpDiscoveryCheckFailedMessageSerializer; +import org.apache.ignite.internal.codegen.TcpDiscoveryClientMetricsUpdateMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryClientPingRequestSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryClientPingResponseSerializer; +import org.apache.ignite.internal.codegen.TcpDiscoveryClusterMetricsHolderMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryConnectionCheckMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryDiscardMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryHandshakeRequestSerializer; @@ -30,8 +32,10 @@ import org.apache.ignite.plugin.extensions.communication.MessageFactory; import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCheckFailedMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientMetricsUpdateMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingRequest; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingResponse; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClusterMetricsHolderMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryConnectionCheckMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryDiscardMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeRequest; @@ -44,6 +48,9 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider { /** {@inheritDoc} */ @Override public void registerAll(MessageFactory factory) { + factory.register((short)-100, TcpDiscoveryClusterMetricsHolderMessage::new, + new TcpDiscoveryClusterMetricsHolderMessageSerializer()); + factory.register((short)0, TcpDiscoveryCheckFailedMessage::new, new TcpDiscoveryCheckFailedMessageSerializer()); factory.register((short)1, TcpDiscoveryPingRequest::new, new TcpDiscoveryPingRequestSerializer()); factory.register((short)2, TcpDiscoveryPingResponse::new, new TcpDiscoveryPingResponseSerializer()); @@ -54,5 +61,6 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider { factory.register((short)7, TcpDiscoveryRingLatencyCheckMessage::new, new TcpDiscoveryRingLatencyCheckMessageSerializer()); factory.register((short)8, TcpDiscoveryHandshakeRequest::new, new TcpDiscoveryHandshakeRequestSerializer()); factory.register((short)9, TcpDiscoveryDiscardMessage::new, new TcpDiscoveryDiscardMessageSerializer()); + factory.register((short)10, TcpDiscoveryClientMetricsUpdateMessage::new, new TcpDiscoveryClientMetricsUpdateMessageSerializer()); } } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 473ff1533578b..5e1b77fefb2e8 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -72,6 +72,7 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.NodeValidationFailedEvent; import org.apache.ignite.failure.FailureContext; +import org.apache.ignite.internal.ClusterMetricsSnapshot; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.IgniteInterruptedCheckedException; @@ -7580,7 +7581,7 @@ private void processClientMetricsUpdateMessage(TcpDiscoveryClientMetricsUpdateMe ClientMessageWorker wrk = clientMsgWorkers.get(msg.creatorNodeId()); if (wrk != null) - wrk.metrics(msg.metrics()); + wrk.metrics(new ClusterMetricsSnapshot(msg.metricsMessage())); else if (log.isDebugEnabled()) log.debug("Received client metrics update message from unknown client node: " + msg); } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java index 8092ef3a7255f..fa62baf044564 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java @@ -19,20 +19,28 @@ import java.util.UUID; import org.apache.ignite.cluster.ClusterMetrics; -import org.apache.ignite.internal.ClusterMetricsSnapshot; +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; /** * Metrics update message. *

* Client sends his metrics in this message. */ -public class TcpDiscoveryClientMetricsUpdateMessage extends TcpDiscoveryAbstractMessage { +public class TcpDiscoveryClientMetricsUpdateMessage extends TcpDiscoveryAbstractMessage implements Message { /** */ private static final long serialVersionUID = 0L; /** */ - private final byte[] metrics; + @Order(value = 5, method = "metricsMessage") + private TcpDiscoveryClusterMetricsHolderMessage metricsMsg; + + /** Constructor for {@link DiscoveryMessageFactory}. */ + public TcpDiscoveryClientMetricsUpdateMessage() { + // No-op. + } /** * Constructor. @@ -43,16 +51,30 @@ public class TcpDiscoveryClientMetricsUpdateMessage extends TcpDiscoveryAbstract public TcpDiscoveryClientMetricsUpdateMessage(UUID creatorNodeId, ClusterMetrics metrics) { super(creatorNodeId); - this.metrics = ClusterMetricsSnapshot.serialize(metrics); + metricsMsg = new TcpDiscoveryClusterMetricsHolderMessage(metrics); + } + + /** + * Gets the metrics message. + * + * @return Metrics holder message. + */ + public TcpDiscoveryClusterMetricsHolderMessage metricsMessage() { + return metricsMsg; } /** - * Gets metrics map. + * Sets the metrics message. * - * @return Metrics map. + * @param metricsMsg Metrics holder message. */ - public ClusterMetrics metrics() { - return ClusterMetricsSnapshot.deserialize(metrics, 0); + public void metricsMessage(TcpDiscoveryClusterMetricsHolderMessage metricsMsg) { + this.metricsMsg = metricsMsg; + } + + /** {@inheritDoc} */ + @Override public short directType() { + return 10; } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClusterMetricsHolderMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClusterMetricsHolderMessage.java new file mode 100644 index 0000000000000..ec86ee01c7804 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClusterMetricsHolderMessage.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp.messages; + +import org.apache.ignite.cluster.ClusterMetrics; +import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; +import org.apache.ignite.internal.processors.cluster.NodeMetricsMessage; +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * Utility container message of the node metrics. Is not a pure {@link TcpDiscoveryAbstractMessage}. + * Reuses Communication's {@link NodeMetricsMessage}. + */ +public class TcpDiscoveryClusterMetricsHolderMessage extends NodeMetricsMessage { + /** Constructor for {@link DiscoveryMessageFactory}. */ + public TcpDiscoveryClusterMetricsHolderMessage() { + // No-op. + } + + /** @param metrics Metrics. */ + public TcpDiscoveryClusterMetricsHolderMessage(ClusterMetrics metrics) { + super(metrics); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return -100; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TcpDiscoveryClusterMetricsHolderMessage.class, this, "super", super.toString()); + } +} From fb3081e15c71180f52a3d28f7db91185bfc3e0bd Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Tue, 23 Dec 2025 13:52:52 +0300 Subject: [PATCH 02/15] works --- .../cluster/NodeMetricsMessage.java | 74 +++++++++---------- .../ignite/spi/discovery/tcp/ClientImpl.java | 13 ++++ .../ignite/spi/discovery/tcp/ServerImpl.java | 11 +++ .../discovery/tcp/TcpDiscoveryIoSession.java | 2 +- 4 files changed, 62 insertions(+), 38 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/NodeMetricsMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/NodeMetricsMessage.java index f11ea1d48c0ba..c76d71b537044 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/NodeMetricsMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/NodeMetricsMessage.java @@ -145,115 +145,115 @@ public class NodeMetricsMessage implements Message { private double load = -1; /** */ - @Order(value = 27, method = "averageCpuLoad") - private double avgLoad = -1; + //@Order(value = 27, method = "averageCpuLoad") + private double avgCpuLoad = -1; /** */ - @Order(value = 28, method = "currentGcCpuLoad") + //@Order(value = 28, method = "currentGcCpuLoad") private double gcLoad = -1; /** */ - @Order(value = 29, method = "heapMemoryInitialized") + //@Order(value = 29, method = "heapMemoryInitialized") private long heapInit = -1; /** */ - @Order(value = 30, method = "heapMemoryUsed") + //@Order(value = 30, method = "heapMemoryUsed") private long heapUsed = -1; /** */ - @Order(value = 31, method = "heapMemoryCommitted") + //@Order(value = 31, method = "heapMemoryCommitted") private long heapCommitted = -1; /** */ - @Order(value = 32, method = "heapMemoryMaximum") + //@Order(value = 32, method = "heapMemoryMaximum") private long heapMax = -1; /** */ - @Order(value = 33, method = "heapMemoryTotal") + //@Order(value = 33, method = "heapMemoryTotal") private long heapTotal = -1; /** */ - @Order(value = 34, method = "heapMemoryInitialized") + //@Order(value = 34, method = "nonHeapMemoryInitialized") private long nonHeapInit = -1; /** */ - @Order(value = 35, method = "heapMemoryUsed") + //@Order(value = 35, method = "nonHeapMemoryUsed") private long nonHeapUsed = -1; /** */ - @Order(value = 36, method = "heapMemoryCommitted") + //@Order(value = 36, method = "nonHeapMemoryCommitted") private long nonHeapCommitted = -1; /** */ - @Order(value = 37, method = "nonHeapMemoryMaximum") + //@Order(value = 37, method = "nonHeapMemoryMaximum") private long nonHeapMax = -1; /** */ - @Order(value = 38, method = "nonHeapMemoryTotal") + //@Order(value = 38, method = "nonHeapMemoryTotal") private long nonHeapTotal = -1; /** */ - @Order(value = 39) + //@Order(value = 39) private long upTime = -1; /** */ - @Order(value = 40) + //@Order(value = 40) private long startTime = -1; /** */ - @Order(value = 41) + //@Order(value = 41) private long nodeStartTime = -1; /** */ - @Order(value = 42, method = "currentThreadCount") + //@Order(value = 42, method = "currentThreadCount") private int threadCnt = -1; /** */ - @Order(value = 43, method = "maximumThreadCount") + //@Order(value = 43, method = "maximumThreadCount") private int peakThreadCnt = -1; /** */ - @Order(value = 44, method = "totalStartedThreadCount") + //@Order(value = 44, method = "totalStartedThreadCount") private long startedThreadCnt = -1; /** */ - @Order(value = 45, method = "currentDaemonThreadCount") + //@Order(value = 45, method = "currentDaemonThreadCount") private int daemonThreadCnt = -1; /** */ - @Order(value = 46, method = "lastDataVersion") + //@Order(value = 46, method = "lastDataVersion") private long lastDataVer = -1; /** */ - @Order(value = 47, method = "sentMessagesCount") + //@Order(value = 47, method = "sentMessagesCount") private int sentMsgsCnt = -1; /** */ - @Order(value = 48, method = "sentBytesCount") + //@Order(value = 48, method = "sentBytesCount") private long sentBytesCnt = -1; /** */ - @Order(value = 49, method = "receivedMessagesCount") + //@Order(value = 49, method = "receivedMessagesCount") private int rcvdMsgsCnt = -1; /** */ - @Order(value = 50, method = "receivedBytesCount") + //@Order(value = 50, method = "receivedBytesCount") private long rcvdBytesCnt = -1; /** */ - @Order(value = 51, method = "outboundMessagesQueueSize") + //@Order(value = 51, method = "outboundMessagesQueueSize") private int outMesQueueSize = -1; /** */ - @Order(value = 52) + //@Order(value = 52) private int totalNodes = -1; /** */ - @Order(value = 53, method = "totalJobsExecutionTime") + //@Order(value = 53, method = "totalJobsExecutionTime") private long totalJobsExecTime = -1; /** */ - @Order(value = 54) + //@Order(value = 54) private long currentPmeDuration = -1; /** */ @@ -297,7 +297,7 @@ public NodeMetricsMessage(Collection nodes) { curIdleTime = 0; availProcs = 0; load = 0; - avgLoad = 0; + avgCpuLoad = 0; gcLoad = 0; heapInit = 0; heapUsed = 0; @@ -399,7 +399,7 @@ public NodeMetricsMessage(Collection nodes) { rcvdBytesCnt += m.getReceivedBytesCount(); outMesQueueSize += m.getOutboundMessagesQueueSize(); - avgLoad += m.getCurrentCpuLoad(); + avgCpuLoad += m.getCurrentCpuLoad(); currentPmeDuration = max(currentPmeDuration, m.getCurrentPmeDuration()); } @@ -412,7 +412,7 @@ public NodeMetricsMessage(Collection nodes) { avgWaitingJobs /= size; avgJobExecTime /= size; avgJobWaitTime /= size; - avgLoad /= size; + avgCpuLoad /= size; if (!F.isEmpty(nodes)) { ClusterMetrics oldestNodeMetrics = oldest(nodes).metrics(); @@ -466,7 +466,7 @@ public NodeMetricsMessage(ClusterMetrics metrics) { availProcs = metrics.getTotalCpus(); load = metrics.getCurrentCpuLoad(); - avgLoad = metrics.getAverageCpuLoad(); + avgCpuLoad = metrics.getAverageCpuLoad(); gcLoad = metrics.getCurrentGcCpuLoad(); heapInit = metrics.getHeapMemoryInitialized(); @@ -895,7 +895,7 @@ public double currentCpuLoad() { /** */ public double averageCpuLoad() { - return avgLoad; + return avgCpuLoad; } /** */ @@ -1044,10 +1044,10 @@ public void currentCpuLoad(double load) { /** * Sets CPU load average over the metrics history. * - * @param avgLoad CPU load average. + * @param avgCpuLoad CPU load average. */ - public void averageCpuLoad(double avgLoad) { - this.avgLoad = avgLoad; + public void averageCpuLoad(double avgCpuLoad) { + this.avgCpuLoad = avgCpuLoad; } /** diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index 2fbb44bad8443..0246aafa618ec 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -1368,6 +1368,9 @@ void ackReceived(TcpDiscoveryClientAckResponse res) { if (unackedMsg != null) { assert unackedMsg.id().equals(res.messageId()) : unackedMsg; + if (unackedMsg instanceof TcpDiscoveryClientMetricsUpdateMessage) + log.error("TEST | ackReceived(), msgId: " + unackedMsg.id()); + unackedMsg = null; } @@ -1418,6 +1421,9 @@ void ackReceived(TcpDiscoveryClientAckResponse res) { if (msg == null) msg = queue.poll(); + if (msg instanceof TcpDiscoveryClientMetricsUpdateMessage) + log.error("TEST | msg = queue.poll() - TcpDiscoveryClientMetricsUpdateMessage"); + if (msg == null) { mux.wait(); @@ -1440,6 +1446,9 @@ void ackReceived(TcpDiscoveryClientAckResponse res) { } } + if (msg instanceof TcpDiscoveryClientMetricsUpdateMessage) + log.error("TEST | spi.writeMessage() - TcpDiscoveryClientMetricsUpdateMessage"); + spi.writeMessage(ses, msg, sockTimeout); IgniteUuid latencyCheckId = msg instanceof TcpDiscoveryRingLatencyCheckMessage ? @@ -1460,11 +1469,15 @@ void ackReceived(TcpDiscoveryClientAckResponse res) { long nowNanos = System.nanoTime(); while (unackedMsg != null && waitEndNanos - nowNanos > 0) { + log.error("TEST | waitUnacked, msgId: " + unackedMsg.id()); + mux.wait(U.nanosToMillis(waitEndNanos - nowNanos)); nowNanos = System.nanoTime(); } + log.error("TEST | passed unacked"); + unacked = unackedMsg; unackedMsg = null; diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 5e1b77fefb2e8..5b2da65064177 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -7066,6 +7066,9 @@ else if (e.hasCause(ObjectStreamException.class) || // Use inifinite timeout for accepting new messages. TcpDiscoveryAbstractMessage msg = spi.readMessage(ses, 0); + if (msg instanceof TcpDiscoveryClientMetricsUpdateMessage) + log.error("TEST | raw received TcpDiscoveryClientMetricsUpdateMessage"); + msg.senderNodeId(nodeId); DebugLogger debugLog = messageLogger(msg); @@ -7291,12 +7294,18 @@ else if (msg instanceof TcpDiscoveryRingLatencyCheckMessage) { msgWorker.addMessage(msg, false, true); } + if (msg instanceof TcpDiscoveryClientMetricsUpdateMessage) + log.error("TEST | received TcpDiscoveryClientMetricsUpdateMessage"); + // Send receipt back. if (clientMsgWrk != null) { TcpDiscoveryClientAckResponse ack = new TcpDiscoveryClientAckResponse(locNodeId, msg.id()); ack.verifierNodeId(locNodeId); + if (msg instanceof TcpDiscoveryClientMetricsUpdateMessage) + log.error("TEST | prepare ack resp, msgId: " + msg.id()); + clientMsgWrk.addMessage(ack); } else @@ -7576,6 +7585,8 @@ else if (ring.hasRemoteNodes() && !isLocalNodeCoordinator()) * @param msg Client metrics update message. */ private void processClientMetricsUpdateMessage(TcpDiscoveryClientMetricsUpdateMessage msg) { + log.error("TEST | processClientMetricsUpdateMessage()"); + assert msg.client(); ClientMessageWorker wrk = clientMsgWorkers.get(msg.creatorNodeId()); diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java index 5c1946f0eac34..694bff58c20ee 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java @@ -170,7 +170,7 @@ T readMessage() throws IgniteCheckedException, IOException { if (MESSAGE_SERIALIZATION != serMode) { detectSslAlert(serMode, in); - throw new IgniteCheckedException("Received unexpected byte while reading discovery message: " + serMode); + throw new IOException("Received unexpected byte while reading discovery message: " + serMode); } byte b0 = (byte)in.read(); From 37ee5bcd612f7ab2c30cb91bfe6c195edd9cfd0f Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Tue, 23 Dec 2025 16:32:19 +0300 Subject: [PATCH 03/15] research --- .../processors/cluster/NodeMetricsMessage.java | 2 +- .../ignite/spi/discovery/tcp/ClientImpl.java | 14 +++++++++----- .../ignite/spi/discovery/tcp/ServerImpl.java | 6 +++--- .../spi/discovery/tcp/TcpDiscoveryIoSession.java | 7 +++++++ 4 files changed, 20 insertions(+), 9 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/NodeMetricsMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/NodeMetricsMessage.java index c76d71b537044..5c2f2af23eeac 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/NodeMetricsMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/NodeMetricsMessage.java @@ -145,7 +145,7 @@ public class NodeMetricsMessage implements Message { private double load = -1; /** */ - //@Order(value = 27, method = "averageCpuLoad") + @Order(value = 27, method = "averageCpuLoad") private double avgCpuLoad = -1; /** */ diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index 0246aafa618ec..7ecb502990e8d 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -1307,6 +1307,8 @@ private class SocketWriter extends IgniteSpiThread { */ private void sendMessage(TcpDiscoveryAbstractMessage msg) { synchronized (mux) { + log.error("TEST | enqueue TcpDiscoveryClientMetricsUpdateMessage, msg=" + msg.id()); + queue.add(msg); mux.notifyAll(); @@ -1422,7 +1424,7 @@ void ackReceived(TcpDiscoveryClientAckResponse res) { msg = queue.poll(); if (msg instanceof TcpDiscoveryClientMetricsUpdateMessage) - log.error("TEST | msg = queue.poll() - TcpDiscoveryClientMetricsUpdateMessage"); + log.error("TEST | polled TcpDiscoveryClientMetricsUpdateMessage, msgId: " + msg.id()); if (msg == null) { mux.wait(); @@ -1435,6 +1437,9 @@ void ackReceived(TcpDiscoveryClientAckResponse res) { for (IgniteInClosure msgLsnr : spi.sndMsgLsnrs) msgLsnr.apply(msg); + if (msg instanceof TcpDiscoveryClientMetricsUpdateMessage) + log.error("TEST | TcpDiscoveryClientMetricsUpdateMessage - after listeners, msgId: " + msg.id()); + boolean ack = !(msg instanceof TcpDiscoveryPingResponse); try { @@ -1447,7 +1452,7 @@ void ackReceived(TcpDiscoveryClientAckResponse res) { } if (msg instanceof TcpDiscoveryClientMetricsUpdateMessage) - log.error("TEST | spi.writeMessage() - TcpDiscoveryClientMetricsUpdateMessage"); + log.error("TEST | spi.writeMessage() - TcpDiscoveryClientMetricsUpdateMessage, msgId: " + msg.id()); spi.writeMessage(ses, msg, sockTimeout); @@ -1469,15 +1474,14 @@ void ackReceived(TcpDiscoveryClientAckResponse res) { long nowNanos = System.nanoTime(); while (unackedMsg != null && waitEndNanos - nowNanos > 0) { - log.error("TEST | waitUnacked, msgId: " + unackedMsg.id()); + if(unackedMsg instanceof TcpDiscoveryClientMetricsUpdateMessage) + log.error("TEST | waitUnacked, msgId: " + unackedMsg.id()); mux.wait(U.nanosToMillis(waitEndNanos - nowNanos)); nowNanos = System.nanoTime(); } - log.error("TEST | passed unacked"); - unacked = unackedMsg; unackedMsg = null; diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 5b2da65064177..1a2d936e07da8 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -7067,7 +7067,7 @@ else if (e.hasCause(ObjectStreamException.class) || TcpDiscoveryAbstractMessage msg = spi.readMessage(ses, 0); if (msg instanceof TcpDiscoveryClientMetricsUpdateMessage) - log.error("TEST | raw received TcpDiscoveryClientMetricsUpdateMessage"); + log.error("TEST | received TcpDiscoveryClientMetricsUpdateMessage 1, msgId: " + msg.id()); msg.senderNodeId(nodeId); @@ -7295,7 +7295,7 @@ else if (msg instanceof TcpDiscoveryRingLatencyCheckMessage) { } if (msg instanceof TcpDiscoveryClientMetricsUpdateMessage) - log.error("TEST | received TcpDiscoveryClientMetricsUpdateMessage"); + log.error("TEST | received TcpDiscoveryClientMetricsUpdateMessage 2, msgId: " + msg.id()); // Send receipt back. if (clientMsgWrk != null) { @@ -7592,7 +7592,7 @@ private void processClientMetricsUpdateMessage(TcpDiscoveryClientMetricsUpdateMe ClientMessageWorker wrk = clientMsgWorkers.get(msg.creatorNodeId()); if (wrk != null) - wrk.metrics(new ClusterMetricsSnapshot(msg.metricsMessage())); + wrk.metrics(new ClusterMetricsSnapshot()); else if (log.isDebugEnabled()) log.debug("Received client metrics update message from unknown client node: " + msg); } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java index 694bff58c20ee..c3e6bc94cc5f9 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java @@ -39,6 +39,7 @@ import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageSerializer; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientMetricsUpdateMessage; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.makeMessageType; @@ -181,6 +182,9 @@ T readMessage() throws IgniteCheckedException, IOException { msgReader.reset(); msgReader.setBuffer(msgBuf); + if (msg instanceof TcpDiscoveryClientMetricsUpdateMessage) + System.err.println("TEST | received TcpDiscoveryClientMetricsUpdateMessage"); + MessageSerializer msgSer = spi.messageFactory().serializer(msg.directType()); boolean finished; @@ -191,6 +195,9 @@ T readMessage() throws IgniteCheckedException, IOException { int read = in.read(msgBuf.array(), 0, msgBuf.limit()); + if(msg instanceof TcpDiscoveryClientMetricsUpdateMessage) + System.err.println("TEST | read TcpDiscoveryClientMetricsUpdateMessage"); + if (read == -1) throw new EOFException("Connection closed before message was fully read."); From 5692673541361f2705ed0ef988652cae984521bf Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Tue, 23 Dec 2025 19:19:42 +0300 Subject: [PATCH 04/15] fix --- .../cluster/NodeMetricsMessage.java | 132 +++++++++--------- .../ignite/spi/discovery/tcp/ClientImpl.java | 17 --- .../ignite/spi/discovery/tcp/ServerImpl.java | 11 -- .../discovery/tcp/TcpDiscoveryIoSession.java | 43 ++++-- 4 files changed, 100 insertions(+), 103 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/NodeMetricsMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/NodeMetricsMessage.java index 5c2f2af23eeac..55b51cb041dba 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/NodeMetricsMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/NodeMetricsMessage.java @@ -137,124 +137,124 @@ public class NodeMetricsMessage implements Message { private long curIdleTime = -1; /** */ - @Order(value = 25, method = "totalCpus") - private int availProcs = -1; + @Order(value = 25) + private int totalCpus = -1; /** */ @Order(value = 26, method = "currentCpuLoad") - private double load = -1; + private double curCpuLoad = -1; /** */ @Order(value = 27, method = "averageCpuLoad") private double avgCpuLoad = -1; /** */ - //@Order(value = 28, method = "currentGcCpuLoad") - private double gcLoad = -1; + @Order(value = 28, method = "currentGcCpuLoad") + private double curGcCpuLoad = -1; /** */ - //@Order(value = 29, method = "heapMemoryInitialized") + @Order(value = 29, method = "heapMemoryInitialized") private long heapInit = -1; /** */ - //@Order(value = 30, method = "heapMemoryUsed") + @Order(value = 30, method = "heapMemoryUsed") private long heapUsed = -1; /** */ - //@Order(value = 31, method = "heapMemoryCommitted") + @Order(value = 31, method = "heapMemoryCommitted") private long heapCommitted = -1; /** */ - //@Order(value = 32, method = "heapMemoryMaximum") + @Order(value = 32, method = "heapMemoryMaximum") private long heapMax = -1; /** */ - //@Order(value = 33, method = "heapMemoryTotal") + @Order(value = 33, method = "heapMemoryTotal") private long heapTotal = -1; /** */ - //@Order(value = 34, method = "nonHeapMemoryInitialized") + @Order(value = 34, method = "nonHeapMemoryInitialized") private long nonHeapInit = -1; /** */ - //@Order(value = 35, method = "nonHeapMemoryUsed") + @Order(value = 35, method = "nonHeapMemoryUsed") private long nonHeapUsed = -1; /** */ - //@Order(value = 36, method = "nonHeapMemoryCommitted") + @Order(value = 36, method = "nonHeapMemoryCommitted") private long nonHeapCommitted = -1; /** */ - //@Order(value = 37, method = "nonHeapMemoryMaximum") + @Order(value = 37, method = "nonHeapMemoryMaximum") private long nonHeapMax = -1; /** */ - //@Order(value = 38, method = "nonHeapMemoryTotal") + @Order(value = 38, method = "nonHeapMemoryTotal") private long nonHeapTotal = -1; /** */ - //@Order(value = 39) + @Order(value = 39) private long upTime = -1; /** */ - //@Order(value = 40) + @Order(value = 40) private long startTime = -1; /** */ - //@Order(value = 41) + @Order(value = 41) private long nodeStartTime = -1; /** */ - //@Order(value = 42, method = "currentThreadCount") + @Order(value = 42, method = "currentThreadCount") private int threadCnt = -1; /** */ - //@Order(value = 43, method = "maximumThreadCount") + @Order(value = 43, method = "maximumThreadCount") private int peakThreadCnt = -1; /** */ - //@Order(value = 44, method = "totalStartedThreadCount") + @Order(value = 44, method = "totalStartedThreadCount") private long startedThreadCnt = -1; /** */ - //@Order(value = 45, method = "currentDaemonThreadCount") + @Order(value = 45, method = "currentDaemonThreadCount") private int daemonThreadCnt = -1; /** */ - //@Order(value = 46, method = "lastDataVersion") + @Order(value = 46, method = "lastDataVersion") private long lastDataVer = -1; /** */ - //@Order(value = 47, method = "sentMessagesCount") + @Order(value = 47, method = "sentMessagesCount") private int sentMsgsCnt = -1; /** */ - //@Order(value = 48, method = "sentBytesCount") + @Order(value = 48, method = "sentBytesCount") private long sentBytesCnt = -1; /** */ - //@Order(value = 49, method = "receivedMessagesCount") + @Order(value = 49, method = "receivedMessagesCount") private int rcvdMsgsCnt = -1; /** */ - //@Order(value = 50, method = "receivedBytesCount") + @Order(value = 50, method = "receivedBytesCount") private long rcvdBytesCnt = -1; /** */ - //@Order(value = 51, method = "outboundMessagesQueueSize") + @Order(value = 51, method = "outboundMessagesQueueSize") private int outMesQueueSize = -1; /** */ - //@Order(value = 52) + @Order(value = 52) private int totalNodes = -1; /** */ - //@Order(value = 53, method = "totalJobsExecutionTime") + @Order(value = 53, method = "totalJobsExecutionTime") private long totalJobsExecTime = -1; /** */ - //@Order(value = 54) - private long currentPmeDuration = -1; + @Order(value = 54, method = "currentPmeDuration") + private long curPmeDuration = -1; /** */ public NodeMetricsMessage() { @@ -295,10 +295,10 @@ public NodeMetricsMessage(Collection nodes) { totalExecTasks = 0; totalIdleTime = 0; curIdleTime = 0; - availProcs = 0; - load = 0; + totalCpus = 0; + curCpuLoad = 0; avgCpuLoad = 0; - gcLoad = 0; + curGcCpuLoad = 0; heapInit = 0; heapUsed = 0; heapCommitted = 0; @@ -323,7 +323,7 @@ public NodeMetricsMessage(Collection nodes) { outMesQueueSize = 0; heapTotal = 0; totalNodes = nodes.size(); - currentPmeDuration = 0; + curPmeDuration = 0; for (ClusterNode node : nodes) { ClusterMetrics m = node.metrics(); @@ -401,7 +401,7 @@ public NodeMetricsMessage(Collection nodes) { avgCpuLoad += m.getCurrentCpuLoad(); - currentPmeDuration = max(currentPmeDuration, m.getCurrentPmeDuration()); + curPmeDuration = max(curPmeDuration, m.getCurrentPmeDuration()); } curJobExecTime /= size; @@ -423,9 +423,9 @@ public NodeMetricsMessage(Collection nodes) { Map> neighborhood = U.neighborhood(nodes); - gcLoad = gcCpus(neighborhood); - load = cpus(neighborhood); - availProcs = cpuCnt(neighborhood); + curGcCpuLoad = currentGcCpuLoad(neighborhood); + curCpuLoad = currentCpuLoad(neighborhood); + totalCpus = cpuCnt(neighborhood); } /** */ @@ -464,10 +464,10 @@ public NodeMetricsMessage(ClusterMetrics metrics) { curIdleTime = metrics.getCurrentIdleTime(); totalIdleTime = metrics.getTotalIdleTime(); - availProcs = metrics.getTotalCpus(); - load = metrics.getCurrentCpuLoad(); + totalCpus = metrics.getTotalCpus(); + curCpuLoad = metrics.getCurrentCpuLoad(); avgCpuLoad = metrics.getAverageCpuLoad(); - gcLoad = metrics.getCurrentGcCpuLoad(); + curGcCpuLoad = metrics.getCurrentGcCpuLoad(); heapInit = metrics.getHeapMemoryInitialized(); heapUsed = metrics.getHeapMemoryUsed(); @@ -487,7 +487,7 @@ public NodeMetricsMessage(ClusterMetrics metrics) { lastDataVer = metrics.getLastDataVersion(); - currentPmeDuration = metrics.getCurrentPmeDuration(); + curPmeDuration = metrics.getCurrentPmeDuration(); totalNodes = metrics.getTotalNodes(); @@ -885,12 +885,12 @@ public void currentIdleTime(long curIdleTime) { /** */ public int totalCpus() { - return availProcs; + return totalCpus; } /** */ public double currentCpuLoad() { - return load; + return curCpuLoad; } /** */ @@ -900,7 +900,7 @@ public double averageCpuLoad() { /** */ public double currentGcCpuLoad() { - return gcLoad; + return curGcCpuLoad; } /** */ @@ -1020,25 +1020,25 @@ public int totalNodes() { /** */ public long currentPmeDuration() { - return currentPmeDuration; + return curPmeDuration; } /** * Sets available processors. * - * @param availProcs Available processors. + * @param totalCpus Available processors. */ - public void totalCpus(int availProcs) { - this.availProcs = availProcs; + public void totalCpus(int totalCpus) { + this.totalCpus = totalCpus; } /** * Sets current CPU load. * - * @param load Current CPU load. + * @param curCpuLoad Current CPU load. */ - public void currentCpuLoad(double load) { - this.load = load; + public void currentCpuLoad(double curCpuLoad) { + this.curCpuLoad = curCpuLoad; } /** @@ -1053,10 +1053,10 @@ public void averageCpuLoad(double avgCpuLoad) { /** * Sets current GC load. * - * @param gcLoad Current GC load. + * @param curGcCpuLoad Current GC load. */ - public void currentGcCpuLoad(double gcLoad) { - this.gcLoad = gcLoad; + public void currentGcCpuLoad(double curGcCpuLoad) { + this.curGcCpuLoad = curGcCpuLoad; } /** @@ -1263,7 +1263,7 @@ public void totalNodes(int totalNodes) { * @param curPmeDuration Execution duration for current partition map exchange. */ public void currentPmeDuration(long curPmeDuration) { - this.currentPmeDuration = curPmeDuration; + this.curPmeDuration = curPmeDuration; } /** @@ -1308,36 +1308,36 @@ private static int cpuCnt(Map> neighborhood) { * @param neighborhood Cluster neighborhood. * @return CPU load. */ - private static int cpus(Map> neighborhood) { - int cpus = 0; + private static double currentCpuLoad(Map> neighborhood) { + double curCpuLoad = 0.0; for (Collection nodes : neighborhood.values()) { ClusterNode first = F.first(nodes); // Projection can be empty if all nodes in it failed. if (first != null) - cpus += first.metrics().getCurrentCpuLoad(); + curCpuLoad += first.metrics().getCurrentCpuLoad(); } - return cpus; + return curCpuLoad; } /** * @param neighborhood Cluster neighborhood. * @return GC CPU load. */ - private static int gcCpus(Map> neighborhood) { - int cpus = 0; + private static double currentGcCpuLoad(Map> neighborhood) { + double curGcCpuLoad = 0; for (Collection nodes : neighborhood.values()) { ClusterNode first = F.first(nodes); // Projection can be empty if all nodes in it failed. if (first != null) - cpus += first.metrics().getCurrentGcCpuLoad(); + curGcCpuLoad += first.metrics().getCurrentGcCpuLoad(); } - return cpus; + return curGcCpuLoad; } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index 7ecb502990e8d..2fbb44bad8443 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -1307,8 +1307,6 @@ private class SocketWriter extends IgniteSpiThread { */ private void sendMessage(TcpDiscoveryAbstractMessage msg) { synchronized (mux) { - log.error("TEST | enqueue TcpDiscoveryClientMetricsUpdateMessage, msg=" + msg.id()); - queue.add(msg); mux.notifyAll(); @@ -1370,9 +1368,6 @@ void ackReceived(TcpDiscoveryClientAckResponse res) { if (unackedMsg != null) { assert unackedMsg.id().equals(res.messageId()) : unackedMsg; - if (unackedMsg instanceof TcpDiscoveryClientMetricsUpdateMessage) - log.error("TEST | ackReceived(), msgId: " + unackedMsg.id()); - unackedMsg = null; } @@ -1423,9 +1418,6 @@ void ackReceived(TcpDiscoveryClientAckResponse res) { if (msg == null) msg = queue.poll(); - if (msg instanceof TcpDiscoveryClientMetricsUpdateMessage) - log.error("TEST | polled TcpDiscoveryClientMetricsUpdateMessage, msgId: " + msg.id()); - if (msg == null) { mux.wait(); @@ -1437,9 +1429,6 @@ void ackReceived(TcpDiscoveryClientAckResponse res) { for (IgniteInClosure msgLsnr : spi.sndMsgLsnrs) msgLsnr.apply(msg); - if (msg instanceof TcpDiscoveryClientMetricsUpdateMessage) - log.error("TEST | TcpDiscoveryClientMetricsUpdateMessage - after listeners, msgId: " + msg.id()); - boolean ack = !(msg instanceof TcpDiscoveryPingResponse); try { @@ -1451,9 +1440,6 @@ void ackReceived(TcpDiscoveryClientAckResponse res) { } } - if (msg instanceof TcpDiscoveryClientMetricsUpdateMessage) - log.error("TEST | spi.writeMessage() - TcpDiscoveryClientMetricsUpdateMessage, msgId: " + msg.id()); - spi.writeMessage(ses, msg, sockTimeout); IgniteUuid latencyCheckId = msg instanceof TcpDiscoveryRingLatencyCheckMessage ? @@ -1474,9 +1460,6 @@ void ackReceived(TcpDiscoveryClientAckResponse res) { long nowNanos = System.nanoTime(); while (unackedMsg != null && waitEndNanos - nowNanos > 0) { - if(unackedMsg instanceof TcpDiscoveryClientMetricsUpdateMessage) - log.error("TEST | waitUnacked, msgId: " + unackedMsg.id()); - mux.wait(U.nanosToMillis(waitEndNanos - nowNanos)); nowNanos = System.nanoTime(); diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 1a2d936e07da8..2526db11b3444 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -7066,9 +7066,6 @@ else if (e.hasCause(ObjectStreamException.class) || // Use inifinite timeout for accepting new messages. TcpDiscoveryAbstractMessage msg = spi.readMessage(ses, 0); - if (msg instanceof TcpDiscoveryClientMetricsUpdateMessage) - log.error("TEST | received TcpDiscoveryClientMetricsUpdateMessage 1, msgId: " + msg.id()); - msg.senderNodeId(nodeId); DebugLogger debugLog = messageLogger(msg); @@ -7294,18 +7291,12 @@ else if (msg instanceof TcpDiscoveryRingLatencyCheckMessage) { msgWorker.addMessage(msg, false, true); } - if (msg instanceof TcpDiscoveryClientMetricsUpdateMessage) - log.error("TEST | received TcpDiscoveryClientMetricsUpdateMessage 2, msgId: " + msg.id()); - // Send receipt back. if (clientMsgWrk != null) { TcpDiscoveryClientAckResponse ack = new TcpDiscoveryClientAckResponse(locNodeId, msg.id()); ack.verifierNodeId(locNodeId); - if (msg instanceof TcpDiscoveryClientMetricsUpdateMessage) - log.error("TEST | prepare ack resp, msgId: " + msg.id()); - clientMsgWrk.addMessage(ack); } else @@ -7585,8 +7576,6 @@ else if (ring.hasRemoteNodes() && !isLocalNodeCoordinator()) * @param msg Client metrics update message. */ private void processClientMetricsUpdateMessage(TcpDiscoveryClientMetricsUpdateMessage msg) { - log.error("TEST | processClientMetricsUpdateMessage()"); - assert msg.client(); ClientMessageWorker wrk = clientMsgWorkers.get(msg.creatorNodeId()); diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java index c3e6bc94cc5f9..f54f5fc9c96b6 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java @@ -39,7 +39,6 @@ import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageSerializer; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; -import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientMetricsUpdateMessage; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.makeMessageType; @@ -182,29 +181,52 @@ T readMessage() throws IgniteCheckedException, IOException { msgReader.reset(); msgReader.setBuffer(msgBuf); - if (msg instanceof TcpDiscoveryClientMetricsUpdateMessage) - System.err.println("TEST | received TcpDiscoveryClientMetricsUpdateMessage"); - MessageSerializer msgSer = spi.messageFactory().serializer(msg.directType()); boolean finished; + byte[] unprocessedBytes = null; + int unprocessedBytesLen = 0; + do { // Should be cleared before first operation. msgBuf.clear(); - int read = in.read(msgBuf.array(), 0, msgBuf.limit()); + if (unprocessedBytes != null) { + assert unprocessedBytesLen == unprocessedBytes.length; + + msgBuf.put(unprocessedBytes); - if(msg instanceof TcpDiscoveryClientMetricsUpdateMessage) - System.err.println("TEST | read TcpDiscoveryClientMetricsUpdateMessage"); + unprocessedBytes = null; + } + + int read = in.read(msgBuf.array(), msgBuf.position(), msgBuf.remaining()); if (read == -1) throw new EOFException("Connection closed before message was fully read."); - msgBuf.limit(read); + if (unprocessedBytesLen > 0) { + msgBuf.rewind(); + + msgBuf.limit(read + unprocessedBytesLen); + + unprocessedBytesLen = 0; + } + else + msgBuf.limit(read); finished = msgSer.readFrom(msg, msgReader); - } while (!finished); + + // We must keep uprocessed bytes read from the socked. Socket won't return it again. + if (!finished && msgBuf.position() > 0 && msgBuf.remaining() > 0) { + unprocessedBytes = new byte[msgBuf.remaining()]; + + unprocessedBytesLen = unprocessedBytes.length; + + msgBuf.get(unprocessedBytes, 0, msgBuf.remaining()); + } + } + while (!finished); return (T)msg; } @@ -270,6 +292,7 @@ private void serializeMessage(Message m, OutputStream out) throws IOException { msgWriter.setBuffer(msgBuf); boolean finished; + int totalWritten = 0; do { // Should be cleared before first operation. @@ -277,6 +300,8 @@ private void serializeMessage(Message m, OutputStream out) throws IOException { finished = msgSer.writeTo(m, msgWriter); + totalWritten += msgBuf.position(); + out.write(msgBuf.array(), 0, msgBuf.position()); } while (!finished); From 04cef17a452fd5385e044cdb3a48b9a35c516178 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Wed, 24 Dec 2025 14:56:07 +0300 Subject: [PATCH 05/15] + master --- .../tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java | 2 +- .../tcp/messages/TcpDiscoveryClusterMetricsHolderMessage.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java index fa62baf044564..144ada143e9df 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java @@ -74,7 +74,7 @@ public void metricsMessage(TcpDiscoveryClusterMetricsHolderMessage metricsMsg) { /** {@inheritDoc} */ @Override public short directType() { - return 10; + return 11; } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClusterMetricsHolderMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClusterMetricsHolderMessage.java index ec86ee01c7804..9252a4addbb8a 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClusterMetricsHolderMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClusterMetricsHolderMessage.java @@ -39,7 +39,7 @@ public TcpDiscoveryClusterMetricsHolderMessage(ClusterMetrics metrics) { /** {@inheritDoc} */ @Override public short directType() { - return -100; + return -101; } /** {@inheritDoc} */ From feafc65fef992cdc08159cdfa8c2c0558a757a0d Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Wed, 24 Dec 2025 15:05:35 +0300 Subject: [PATCH 06/15] fix --- .../java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java | 2 +- .../apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 2526db11b3444..5e1b77fefb2e8 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -7581,7 +7581,7 @@ private void processClientMetricsUpdateMessage(TcpDiscoveryClientMetricsUpdateMe ClientMessageWorker wrk = clientMsgWorkers.get(msg.creatorNodeId()); if (wrk != null) - wrk.metrics(new ClusterMetricsSnapshot()); + wrk.metrics(new ClusterMetricsSnapshot(msg.metricsMessage())); else if (log.isDebugEnabled()) log.debug("Received client metrics update message from unknown client node: " + msg); } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java index f54f5fc9c96b6..97d42c687a73b 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java @@ -170,6 +170,8 @@ T readMessage() throws IgniteCheckedException, IOException { if (MESSAGE_SERIALIZATION != serMode) { detectSslAlert(serMode, in); + // IOException type is important for ServerImpl for connection error processing behavior. + // It may search the cause (X.hasCause). throw new IOException("Received unexpected byte while reading discovery message: " + serMode); } From 877755f01a294c56e3252697b592cd18ec2a6322 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Wed, 24 Dec 2025 15:06:07 +0300 Subject: [PATCH 07/15] minor --- .../apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java index 97d42c687a73b..2bb3a9e0e6c4e 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java @@ -219,7 +219,7 @@ T readMessage() throws IgniteCheckedException, IOException { finished = msgSer.readFrom(msg, msgReader); - // We must keep uprocessed bytes read from the socked. Socket won't return it again. + // We must keepthe uprocessed bytes read from the socket. It won't return it again. if (!finished && msgBuf.position() > 0 && msgBuf.remaining() > 0) { unprocessedBytes = new byte[msgBuf.remaining()]; From ec8590bf855f964c8f83f81b21c49dcf0bf40b1c Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Wed, 24 Dec 2025 15:14:36 +0300 Subject: [PATCH 08/15] cleanup --- .../ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java index 2bb3a9e0e6c4e..00f7f69b8d4da 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java @@ -219,7 +219,7 @@ T readMessage() throws IgniteCheckedException, IOException { finished = msgSer.readFrom(msg, msgReader); - // We must keepthe uprocessed bytes read from the socket. It won't return it again. + // We must keep the uprocessed bytes read from the socket. It won't return them again. if (!finished && msgBuf.position() > 0 && msgBuf.remaining() > 0) { unprocessedBytes = new byte[msgBuf.remaining()]; @@ -294,7 +294,6 @@ private void serializeMessage(Message m, OutputStream out) throws IOException { msgWriter.setBuffer(msgBuf); boolean finished; - int totalWritten = 0; do { // Should be cleared before first operation. @@ -302,8 +301,6 @@ private void serializeMessage(Message m, OutputStream out) throws IOException { finished = msgSer.writeTo(m, msgWriter); - totalWritten += msgBuf.position(); - out.write(msgBuf.array(), 0, msgBuf.position()); } while (!finished); From ec68334c5163ecd9c5970bac33be3e1d2ec9e76e Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Mon, 29 Dec 2025 19:29:24 +0300 Subject: [PATCH 09/15] fix --- .../discovery/tcp/TcpDiscoveryIoSession.java | 47 ++++++++----------- 1 file changed, 20 insertions(+), 27 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java index 00f7f69b8d4da..ce550b191c529 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java @@ -170,7 +170,7 @@ T readMessage() throws IgniteCheckedException, IOException { if (MESSAGE_SERIALIZATION != serMode) { detectSslAlert(serMode, in); - // IOException type is important for ServerImpl for connection error processing behavior. + // IOException type is important for ServerImpl. The connection error processing behavior depends on it. // It may search the cause (X.hasCause). throw new IOException("Received unexpected byte while reading discovery message: " + serMode); } @@ -187,48 +187,41 @@ T readMessage() throws IgniteCheckedException, IOException { boolean finished; - byte[] unprocessedBytes = null; - int unprocessedBytesLen = 0; + // Should be cleared before first operation. + msgBuf.clear(); do { - // Should be cleared before first operation. - msgBuf.clear(); - - if (unprocessedBytes != null) { - assert unprocessedBytesLen == unprocessedBytes.length; - - msgBuf.put(unprocessedBytes); - - unprocessedBytes = null; - } - int read = in.read(msgBuf.array(), msgBuf.position(), msgBuf.remaining()); if (read == -1) throw new EOFException("Connection closed before message was fully read."); - if (unprocessedBytesLen > 0) { - msgBuf.rewind(); + msgBuf.limit(read + msgBuf.position()); - msgBuf.limit(read + unprocessedBytesLen); + finished = msgSer.readFrom(msg, msgReader); - unprocessedBytesLen = 0; - } - else - msgBuf.limit(read); + // We assume that there is only one message in the socket because Discovery is a serial protocol with + // acking of each message before sending next. + assert msgBuf.remaining() == 0 || finished : "Some data was read from the socket but left unprocessed."; - finished = msgSer.readFrom(msg, msgReader); + if (finished) + break; // We must keep the uprocessed bytes read from the socket. It won't return them again. - if (!finished && msgBuf.position() > 0 && msgBuf.remaining() > 0) { - unprocessedBytes = new byte[msgBuf.remaining()]; + if (msgBuf.remaining() > 0 && msgBuf.position() > 0) { + // Shift left the bytes rest. + byte[] unprocessedTail = new byte[msgBuf.remaining()]; + + msgBuf.get(unprocessedTail, 0, msgBuf.remaining()); - unprocessedBytesLen = unprocessedBytes.length; + msgBuf.clear(); - msgBuf.get(unprocessedBytes, 0, msgBuf.remaining()); + msgBuf.put(unprocessedTail); } + else + msgBuf.clear(); } - while (!finished); + while (true); return (T)msg; } From 875e70b0f9f8334ec39610615d74959d79af84b8 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Mon, 29 Dec 2025 19:29:24 +0300 Subject: [PATCH 10/15] fix --- .../discovery/tcp/TcpDiscoveryIoSession.java | 53 ++++++++----------- 1 file changed, 21 insertions(+), 32 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java index 00f7f69b8d4da..53495923bb2ad 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java @@ -170,15 +170,12 @@ T readMessage() throws IgniteCheckedException, IOException { if (MESSAGE_SERIALIZATION != serMode) { detectSslAlert(serMode, in); - // IOException type is important for ServerImpl for connection error processing behavior. - // It may search the cause (X.hasCause). + // IOException type is important for ServerImpl. It may search the cause (X.hasCause). + // The connection error processing behavior depends on it. throw new IOException("Received unexpected byte while reading discovery message: " + serMode); } - byte b0 = (byte)in.read(); - byte b1 = (byte)in.read(); - - Message msg = spi.messageFactory().create(makeMessageType(b0, b1)); + Message msg = spi.messageFactory().create(makeMessageType((byte)in.read(), (byte)in.read())); msgReader.reset(); msgReader.setBuffer(msgBuf); @@ -187,48 +184,40 @@ T readMessage() throws IgniteCheckedException, IOException { boolean finished; - byte[] unprocessedBytes = null; - int unprocessedBytesLen = 0; + msgBuf.clear(); do { - // Should be cleared before first operation. - msgBuf.clear(); - - if (unprocessedBytes != null) { - assert unprocessedBytesLen == unprocessedBytes.length; - - msgBuf.put(unprocessedBytes); - - unprocessedBytes = null; - } - int read = in.read(msgBuf.array(), msgBuf.position(), msgBuf.remaining()); if (read == -1) throw new EOFException("Connection closed before message was fully read."); - if (unprocessedBytesLen > 0) { - msgBuf.rewind(); + msgBuf.limit(msgBuf.position() > 0 ? msgBuf.position() + read + 1 : read); - msgBuf.limit(read + unprocessedBytesLen); + finished = msgSer.readFrom(msg, msgReader); - unprocessedBytesLen = 0; - } - else - msgBuf.limit(read); + // We assume that there is only one message in the socket because Discovery is a serial protocol with + // acking of each message before sending next. + assert msgBuf.remaining() == 0 || !finished : "Some data was read from the socket but left unprocessed."; - finished = msgSer.readFrom(msg, msgReader); + if (finished) + break; // We must keep the uprocessed bytes read from the socket. It won't return them again. - if (!finished && msgBuf.position() > 0 && msgBuf.remaining() > 0) { - unprocessedBytes = new byte[msgBuf.remaining()]; + if (msgBuf.remaining() > 0 && msgBuf.position() > 0) { + // Shift left the bytes rest. + byte[] unprocessedTail = new byte[msgBuf.remaining()]; + + msgBuf.get(unprocessedTail, 0, msgBuf.remaining()); - unprocessedBytesLen = unprocessedBytes.length; + msgBuf.clear(); - msgBuf.get(unprocessedBytes, 0, msgBuf.remaining()); + msgBuf.put(unprocessedTail); } + else + msgBuf.clear(); } - while (!finished); + while (true); return (T)msg; } From 27da78fdc823f9182181a66354d95a2277840d33 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Tue, 30 Dec 2025 13:14:04 +0300 Subject: [PATCH 11/15] fix --- .../apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java index 53495923bb2ad..32642e2730b89 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java @@ -204,7 +204,7 @@ T readMessage() throws IgniteCheckedException, IOException { break; // We must keep the uprocessed bytes read from the socket. It won't return them again. - if (msgBuf.remaining() > 0 && msgBuf.position() > 0) { + if (msgBuf.remaining() > 0) { // Shift left the bytes rest. byte[] unprocessedTail = new byte[msgBuf.remaining()]; From 5b7b53abddff9bbbc83726dc053937e482a36040 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Tue, 30 Dec 2025 13:19:36 +0300 Subject: [PATCH 12/15] fix --- .../spi/discovery/tcp/TcpDiscoveryIoSession.java | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java index 32642e2730b89..5988603740286 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java @@ -204,18 +204,17 @@ T readMessage() throws IgniteCheckedException, IOException { break; // We must keep the uprocessed bytes read from the socket. It won't return them again. - if (msgBuf.remaining() > 0) { - // Shift left the bytes rest. - byte[] unprocessedTail = new byte[msgBuf.remaining()]; + byte[] unprocessedTail = null; + if (msgBuf.remaining() > 0) { + unprocessedTail = new byte[msgBuf.remaining()]; msgBuf.get(unprocessedTail, 0, msgBuf.remaining()); + } - msgBuf.clear(); + msgBuf.clear(); + if (unprocessedTail != null) msgBuf.put(unprocessedTail); - } - else - msgBuf.clear(); } while (true); From 40b6df0942272a8761e17d20b44225b231a46b85 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Tue, 30 Dec 2025 13:49:05 +0300 Subject: [PATCH 13/15] fix --- .../managers/discovery/DiscoveryMessageFactory.java | 7 +++---- .../spi/discovery/tcp/TcpDiscoveryIoSession.java | 5 +++-- .../TcpDiscoveryClientMetricsUpdateMessage.java | 8 ++++---- ...age.java => TcpDiscoveryNodeMetricsMessage.java} | 13 +++++++------ 4 files changed, 17 insertions(+), 16 deletions(-) rename modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/{TcpDiscoveryClusterMetricsHolderMessage.java => TcpDiscoveryNodeMetricsMessage.java} (71%) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java index f7ff332d8869c..b295f26c20155 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java @@ -22,12 +22,12 @@ import org.apache.ignite.internal.codegen.TcpDiscoveryClientMetricsUpdateMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryClientPingRequestSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryClientPingResponseSerializer; -import org.apache.ignite.internal.codegen.TcpDiscoveryClusterMetricsHolderMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryConnectionCheckMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryDiscardMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryHandshakeRequestSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryHandshakeResponseSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryLoopbackProblemMessageSerializer; +import org.apache.ignite.internal.codegen.TcpDiscoveryNodeMetricsMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryPingRequestSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryPingResponseSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryRingLatencyCheckMessageSerializer; @@ -38,12 +38,12 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientMetricsUpdateMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingRequest; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingResponse; -import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClusterMetricsHolderMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryConnectionCheckMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryDiscardMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeRequest; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeResponse; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryLoopbackProblemMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeMetricsMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingRequest; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryRingLatencyCheckMessage; @@ -52,8 +52,7 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider { /** {@inheritDoc} */ @Override public void registerAll(MessageFactory factory) { - factory.register((short)-101, TcpDiscoveryClusterMetricsHolderMessage::new, - new TcpDiscoveryClusterMetricsHolderMessageSerializer()); + factory.register((short)-101, TcpDiscoveryNodeMetricsMessage::new, new TcpDiscoveryNodeMetricsMessageSerializer()); factory.register((short)-100, InetAddressMessage::new, new InetAddressMessageSerializer()); factory.register((short)0, TcpDiscoveryCheckFailedMessage::new, new TcpDiscoveryCheckFailedMessageSerializer()); diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java index 5988603740286..92a20088bf4c5 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java @@ -196,8 +196,9 @@ T readMessage() throws IgniteCheckedException, IOException { finished = msgSer.readFrom(msg, msgReader); - // We assume that there is only one message in the socket because Discovery is a serial protocol with - // acking of each message before sending next. + // We rely on the fact that Discovery only sends next message upon receiving a receipt for the previous one. + // This behaviour guarantees that we never read a next message from the buffer right after the end of + // the previous message. assert msgBuf.remaining() == 0 || !finished : "Some data was read from the socket but left unprocessed."; if (finished) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java index 144ada143e9df..d233e126c94d1 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java @@ -35,7 +35,7 @@ public class TcpDiscoveryClientMetricsUpdateMessage extends TcpDiscoveryAbstract /** */ @Order(value = 5, method = "metricsMessage") - private TcpDiscoveryClusterMetricsHolderMessage metricsMsg; + private TcpDiscoveryNodeMetricsMessage metricsMsg; /** Constructor for {@link DiscoveryMessageFactory}. */ public TcpDiscoveryClientMetricsUpdateMessage() { @@ -51,7 +51,7 @@ public TcpDiscoveryClientMetricsUpdateMessage() { public TcpDiscoveryClientMetricsUpdateMessage(UUID creatorNodeId, ClusterMetrics metrics) { super(creatorNodeId); - metricsMsg = new TcpDiscoveryClusterMetricsHolderMessage(metrics); + metricsMsg = new TcpDiscoveryNodeMetricsMessage(metrics); } /** @@ -59,7 +59,7 @@ public TcpDiscoveryClientMetricsUpdateMessage(UUID creatorNodeId, ClusterMetrics * * @return Metrics holder message. */ - public TcpDiscoveryClusterMetricsHolderMessage metricsMessage() { + public TcpDiscoveryNodeMetricsMessage metricsMessage() { return metricsMsg; } @@ -68,7 +68,7 @@ public TcpDiscoveryClusterMetricsHolderMessage metricsMessage() { * * @param metricsMsg Metrics holder message. */ - public void metricsMessage(TcpDiscoveryClusterMetricsHolderMessage metricsMsg) { + public void metricsMessage(TcpDiscoveryNodeMetricsMessage metricsMsg) { this.metricsMsg = metricsMsg; } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClusterMetricsHolderMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeMetricsMessage.java similarity index 71% rename from modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClusterMetricsHolderMessage.java rename to modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeMetricsMessage.java index 9252a4addbb8a..02ac999a004f5 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClusterMetricsHolderMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeMetricsMessage.java @@ -23,17 +23,18 @@ import org.apache.ignite.internal.util.typedef.internal.S; /** - * Utility container message of the node metrics. Is not a pure {@link TcpDiscoveryAbstractMessage}. - * Reuses Communication's {@link NodeMetricsMessage}. + * We cannot directly reuse `NodeMetricsMessage` in Discovery as it is registered in a message factory of Communication + * component and thus is unavailable in Discovery. We have to extend `NodeMetricsMessage` and register this subclass in + * message factory of Discovery component. */ -public class TcpDiscoveryClusterMetricsHolderMessage extends NodeMetricsMessage { +public class TcpDiscoveryNodeMetricsMessage extends NodeMetricsMessage { /** Constructor for {@link DiscoveryMessageFactory}. */ - public TcpDiscoveryClusterMetricsHolderMessage() { + public TcpDiscoveryNodeMetricsMessage() { // No-op. } /** @param metrics Metrics. */ - public TcpDiscoveryClusterMetricsHolderMessage(ClusterMetrics metrics) { + public TcpDiscoveryNodeMetricsMessage(ClusterMetrics metrics) { super(metrics); } @@ -44,6 +45,6 @@ public TcpDiscoveryClusterMetricsHolderMessage(ClusterMetrics metrics) { /** {@inheritDoc} */ @Override public String toString() { - return S.toString(TcpDiscoveryClusterMetricsHolderMessage.class, this, "super", super.toString()); + return S.toString(TcpDiscoveryNodeMetricsMessage.class, this, "super", super.toString()); } } From af40fc4a275d0400c09c5001dbfbf006e49abdc8 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Tue, 30 Dec 2025 13:54:09 +0300 Subject: [PATCH 14/15] fix. + master --- .../tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java | 2 +- .../discovery/tcp/messages/TcpDiscoveryNodeMetricsMessage.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java index d233e126c94d1..f110c40e8cb92 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java @@ -74,7 +74,7 @@ public void metricsMessage(TcpDiscoveryNodeMetricsMessage metricsMsg) { /** {@inheritDoc} */ @Override public short directType() { - return 11; + return 13; } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeMetricsMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeMetricsMessage.java index 02ac999a004f5..0c12acee04a2b 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeMetricsMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeMetricsMessage.java @@ -40,7 +40,7 @@ public TcpDiscoveryNodeMetricsMessage(ClusterMetrics metrics) { /** {@inheritDoc} */ @Override public short directType() { - return -101; + return -102; } /** {@inheritDoc} */ From 866eec0b503442dbbbe62e5f3fa63b25202ecdc8 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Sat, 3 Jan 2026 01:55:35 +0300 Subject: [PATCH 15/15] fix --- .../ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java index 92a20088bf4c5..edcab473b4c91 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java @@ -192,7 +192,14 @@ T readMessage() throws IgniteCheckedException, IOException { if (read == -1) throw new EOFException("Connection closed before message was fully read."); - msgBuf.limit(msgBuf.position() > 0 ? msgBuf.position() + read + 1 : read); + if (msgBuf.position() > 0) { + msgBuf.limit(msgBuf.position() + read); + + // We've stored an unprocessed tail before. + msgBuf.rewind(); + } + else + msgBuf.limit(read); finished = msgSer.readFrom(msg, msgReader);