From 08683e150f57af18c67778762121c31cf7666833 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Wed, 10 Dec 2025 21:39:58 +0300 Subject: [PATCH 1/8] raw --- .../discovery/DiscoveryMessageFactory.java | 3 +++ .../tracing/messages/SpanContainer.java | 2 +- .../TcpDiscoveryAbstractTraceableMessage.java | 27 ++++++++++++++++--- .../messages/TcpDiscoveryNodeLeftMessage.java | 14 +++++++++- 4 files changed, 41 insertions(+), 5 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java index 1ac4567b31ea0..0b563d48716db 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java @@ -20,6 +20,7 @@ import org.apache.ignite.internal.codegen.TcpDiscoveryCheckFailedMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryClientPingRequestSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryClientPingResponseSerializer; +import org.apache.ignite.internal.codegen.TcpDiscoveryNodeLeftMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryPingRequestSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryPingResponseSerializer; import org.apache.ignite.plugin.extensions.communication.MessageFactory; @@ -27,6 +28,7 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCheckFailedMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingRequest; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingResponse; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeLeftMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingRequest; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse; @@ -39,5 +41,6 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider { factory.register((short)2, TcpDiscoveryPingResponse::new, new TcpDiscoveryPingResponseSerializer()); factory.register((short)3, TcpDiscoveryClientPingRequest::new, new TcpDiscoveryClientPingRequestSerializer()); factory.register((short)4, TcpDiscoveryClientPingResponse::new, new TcpDiscoveryClientPingResponseSerializer()); + factory.register((short)5, TcpDiscoveryNodeLeftMessage::new, new TcpDiscoveryNodeLeftMessageSerializer()); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/tracing/messages/SpanContainer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/tracing/messages/SpanContainer.java index c1f73f47d2cee..dc60d51f0d13f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/tracing/messages/SpanContainer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/tracing/messages/SpanContainer.java @@ -46,7 +46,7 @@ public byte[] serializedSpanBytes() { * @param serializedSpan Serialized span. */ public void serializedSpanBytes(byte[] serializedSpan) { - this.serializedSpanBytes = serializedSpan.clone(); + serializedSpanBytes = serializedSpan.clone(); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractTraceableMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractTraceableMessage.java index 433566d7e3021..3df159a48e806 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractTraceableMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractTraceableMessage.java @@ -17,10 +17,12 @@ package org.apache.ignite.spi.discovery.tcp.messages; -import java.io.Externalizable; import java.util.UUID; +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; import org.apache.ignite.internal.processors.tracing.messages.SpanContainer; import org.apache.ignite.internal.processors.tracing.messages.TraceableMessage; +import org.jetbrains.annotations.Nullable; /** * Abstract traceable message for TCP discovery. @@ -29,8 +31,12 @@ public abstract class TcpDiscoveryAbstractTraceableMessage extends TcpDiscoveryA /** Container. */ private SpanContainer spanContainer = new SpanContainer(); + /** Serialization holder of {@link #spanContainer}'s bytes. */ + @Order(value = 5, method = "spanBytes") + private @Nullable byte[] spanBytesHolder; + /** - * Default no-arg constructor for {@link Externalizable} interface. + * Default constructor for {@link DiscoveryMessageFactory}. */ protected TcpDiscoveryAbstractTraceableMessage() { // No-op. @@ -51,7 +57,7 @@ protected TcpDiscoveryAbstractTraceableMessage(UUID creatorNodeId) { protected TcpDiscoveryAbstractTraceableMessage(TcpDiscoveryAbstractTraceableMessage msg) { super(msg); - this.spanContainer = msg.spanContainer; + spanContainer = msg.spanContainer; } /** @@ -67,6 +73,21 @@ public Object readResolve() { return this; } + /** @return {@link #spanContainer}'s bytes. */ + public @Nullable byte[] spanBytes() { + return spanContainer == null ? null : spanContainer.serializedSpanBytes(); + } + + /** @param spanBytes {@link #spanContainer}'s bytes. */ + public void spanBytes(@Nullable byte[] spanBytes) { + if (spanBytes == null) + return; + + readResolve(); + + spanContainer.serializedSpanBytes(spanBytes); + } + /** {@inheritDoc} */ @Override public SpanContainer spanContainer() { return spanContainer; diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java index 5d6df69b715b7..350bb3eaa647b 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java @@ -18,7 +18,9 @@ package org.apache.ignite.spi.discovery.tcp.messages; import java.util.UUID; +import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; /** * Sent by node that is stopping to coordinator across the ring, @@ -26,10 +28,15 @@ */ @TcpDiscoveryEnsureDelivery @TcpDiscoveryRedirectToClient -public class TcpDiscoveryNodeLeftMessage extends TcpDiscoveryAbstractTraceableMessage { +public class TcpDiscoveryNodeLeftMessage extends TcpDiscoveryAbstractTraceableMessage implements Message { /** */ private static final long serialVersionUID = 0L; + /** Default constructor for {@link DiscoveryMessageFactory}. */ + public TcpDiscoveryNodeLeftMessage() { + // No-op. + } + /** * Constructor. * @@ -43,4 +50,9 @@ public TcpDiscoveryNodeLeftMessage(UUID creatorNodeId) { @Override public String toString() { return S.toString(TcpDiscoveryNodeLeftMessage.class, this, "super", super.toString()); } + + /** */ + @Override public short directType() { + return 5; + } } From ae56c1cd4b0770d3da0aaadd8222f0cb6a4b8ae6 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Thu, 11 Dec 2025 22:06:54 +0300 Subject: [PATCH 2/8] class cast fixes --- .../discovery/tcp/messages/TcpDiscoveryAbstractMessage.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java index c8fea72ec3cfd..d15a642a1db83 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java @@ -293,8 +293,10 @@ public void addFailedNode(UUID nodeId) { /** * @param failedNodes Failed nodes. */ - public void failedNodes(@Nullable Set failedNodes) { - this.failedNodes = failedNodes; + public void failedNodes(@Nullable Collection failedNodes) { + this.failedNodes = failedNodes == null + ? null + : failedNodes instanceof Set ? (Set)failedNodes : new HashSet<>(failedNodes); } /** From 71e0fe20ec367988fff2cd1667f3ae326079be22 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Fri, 12 Dec 2025 00:03:55 +0300 Subject: [PATCH 3/8] exception type fix --- .../ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java index 5c1946f0eac34..b64cea9133710 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java @@ -170,7 +170,9 @@ T readMessage() throws IgniteCheckedException, IOException { if (MESSAGE_SERIALIZATION != serMode) { detectSslAlert(serMode, in); - throw new IgniteCheckedException("Received unexpected byte while reading discovery message: " + serMode); + // There are many `X.hasCause` in the discovery errors processing which change connection recovery processing. + // It is better to throw an IOException on reading failures. Often happens at nodes stop and streams closing. + throw new IOException("Received unexpected byte while reading discovery message: " + serMode); } byte b0 = (byte)in.read(); From f8b0a8d047d80069bbd5778108a2975db9792156 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Mon, 15 Dec 2025 18:19:46 +0300 Subject: [PATCH 4/8] merged master --- .../internal/managers/discovery/DiscoveryMessageFactory.java | 4 ++-- .../discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java | 2 +- .../org/apache/ignite/internal/IgniteClientRejoinTest.java | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java index 0650e2f8e6c76..1d0367c9a72b4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java @@ -21,6 +21,7 @@ import org.apache.ignite.internal.codegen.TcpDiscoveryClientPingRequestSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryClientPingResponseSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryLoopbackProblemMessageSerializer; +import org.apache.ignite.internal.codegen.TcpDiscoveryNodeLeftMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryPingRequestSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryPingResponseSerializer; import org.apache.ignite.plugin.extensions.communication.MessageFactory; @@ -29,10 +30,9 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingRequest; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingResponse; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryLoopbackProblemMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeLeftMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingRequest; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse; -import org.apache.ignite.internal.codegen.TcpDiscoveryNodeLeftMessageSerializer; -import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeLeftMessage; /** Message factory for discovery messages. */ public class DiscoveryMessageFactory implements MessageFactoryProvider { diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java index 350bb3eaa647b..ffc1180890928 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java @@ -53,6 +53,6 @@ public TcpDiscoveryNodeLeftMessage(UUID creatorNodeId) { /** */ @Override public short directType() { - return 5; + return 6; } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java index b29a8171fbce4..28f3d7255db02 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java @@ -264,7 +264,7 @@ public void testClientsReconnect() throws Exception { * @throws Exception If failed. */ @Test - public void testClientsReconnectDisabled() throws Exception { + public void testClientsRecocnnectDisabled() throws Exception { clientReconnectDisabled = true; Ignite srv1 = startGrid("server1"); From 9ff2e0699dedc9eac0c44f2fea6ba6da4803dcb9 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Wed, 24 Dec 2025 19:06:46 +0300 Subject: [PATCH 5/8] + master --- .../cluster/NodeMetricsMessage.java | 104 +++++++++--------- .../ignite/spi/discovery/tcp/ServerImpl.java | 3 +- .../discovery/tcp/TcpDiscoveryIoSession.java | 39 ++++++- .../messages/TcpDiscoveryAbstractMessage.java | 6 +- ...cpDiscoveryClientMetricsUpdateMessage.java | 38 +++++-- ...pDiscoveryClusterMetricsHolderMessage.java | 49 +++++++++ .../messages/TcpDiscoveryNodeLeftMessage.java | 2 +- .../internal/IgniteClientRejoinTest.java | 2 +- 8 files changed, 171 insertions(+), 72 deletions(-) create mode 100644 modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClusterMetricsHolderMessage.java diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/NodeMetricsMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/NodeMetricsMessage.java index f11ea1d48c0ba..55b51cb041dba 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/NodeMetricsMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/NodeMetricsMessage.java @@ -137,20 +137,20 @@ public class NodeMetricsMessage implements Message { private long curIdleTime = -1; /** */ - @Order(value = 25, method = "totalCpus") - private int availProcs = -1; + @Order(value = 25) + private int totalCpus = -1; /** */ @Order(value = 26, method = "currentCpuLoad") - private double load = -1; + private double curCpuLoad = -1; /** */ @Order(value = 27, method = "averageCpuLoad") - private double avgLoad = -1; + private double avgCpuLoad = -1; /** */ @Order(value = 28, method = "currentGcCpuLoad") - private double gcLoad = -1; + private double curGcCpuLoad = -1; /** */ @Order(value = 29, method = "heapMemoryInitialized") @@ -173,15 +173,15 @@ public class NodeMetricsMessage implements Message { private long heapTotal = -1; /** */ - @Order(value = 34, method = "heapMemoryInitialized") + @Order(value = 34, method = "nonHeapMemoryInitialized") private long nonHeapInit = -1; /** */ - @Order(value = 35, method = "heapMemoryUsed") + @Order(value = 35, method = "nonHeapMemoryUsed") private long nonHeapUsed = -1; /** */ - @Order(value = 36, method = "heapMemoryCommitted") + @Order(value = 36, method = "nonHeapMemoryCommitted") private long nonHeapCommitted = -1; /** */ @@ -253,8 +253,8 @@ public class NodeMetricsMessage implements Message { private long totalJobsExecTime = -1; /** */ - @Order(value = 54) - private long currentPmeDuration = -1; + @Order(value = 54, method = "currentPmeDuration") + private long curPmeDuration = -1; /** */ public NodeMetricsMessage() { @@ -295,10 +295,10 @@ public NodeMetricsMessage(Collection nodes) { totalExecTasks = 0; totalIdleTime = 0; curIdleTime = 0; - availProcs = 0; - load = 0; - avgLoad = 0; - gcLoad = 0; + totalCpus = 0; + curCpuLoad = 0; + avgCpuLoad = 0; + curGcCpuLoad = 0; heapInit = 0; heapUsed = 0; heapCommitted = 0; @@ -323,7 +323,7 @@ public NodeMetricsMessage(Collection nodes) { outMesQueueSize = 0; heapTotal = 0; totalNodes = nodes.size(); - currentPmeDuration = 0; + curPmeDuration = 0; for (ClusterNode node : nodes) { ClusterMetrics m = node.metrics(); @@ -399,9 +399,9 @@ public NodeMetricsMessage(Collection nodes) { rcvdBytesCnt += m.getReceivedBytesCount(); outMesQueueSize += m.getOutboundMessagesQueueSize(); - avgLoad += m.getCurrentCpuLoad(); + avgCpuLoad += m.getCurrentCpuLoad(); - currentPmeDuration = max(currentPmeDuration, m.getCurrentPmeDuration()); + curPmeDuration = max(curPmeDuration, m.getCurrentPmeDuration()); } curJobExecTime /= size; @@ -412,7 +412,7 @@ public NodeMetricsMessage(Collection nodes) { avgWaitingJobs /= size; avgJobExecTime /= size; avgJobWaitTime /= size; - avgLoad /= size; + avgCpuLoad /= size; if (!F.isEmpty(nodes)) { ClusterMetrics oldestNodeMetrics = oldest(nodes).metrics(); @@ -423,9 +423,9 @@ public NodeMetricsMessage(Collection nodes) { Map> neighborhood = U.neighborhood(nodes); - gcLoad = gcCpus(neighborhood); - load = cpus(neighborhood); - availProcs = cpuCnt(neighborhood); + curGcCpuLoad = currentGcCpuLoad(neighborhood); + curCpuLoad = currentCpuLoad(neighborhood); + totalCpus = cpuCnt(neighborhood); } /** */ @@ -464,10 +464,10 @@ public NodeMetricsMessage(ClusterMetrics metrics) { curIdleTime = metrics.getCurrentIdleTime(); totalIdleTime = metrics.getTotalIdleTime(); - availProcs = metrics.getTotalCpus(); - load = metrics.getCurrentCpuLoad(); - avgLoad = metrics.getAverageCpuLoad(); - gcLoad = metrics.getCurrentGcCpuLoad(); + totalCpus = metrics.getTotalCpus(); + curCpuLoad = metrics.getCurrentCpuLoad(); + avgCpuLoad = metrics.getAverageCpuLoad(); + curGcCpuLoad = metrics.getCurrentGcCpuLoad(); heapInit = metrics.getHeapMemoryInitialized(); heapUsed = metrics.getHeapMemoryUsed(); @@ -487,7 +487,7 @@ public NodeMetricsMessage(ClusterMetrics metrics) { lastDataVer = metrics.getLastDataVersion(); - currentPmeDuration = metrics.getCurrentPmeDuration(); + curPmeDuration = metrics.getCurrentPmeDuration(); totalNodes = metrics.getTotalNodes(); @@ -885,22 +885,22 @@ public void currentIdleTime(long curIdleTime) { /** */ public int totalCpus() { - return availProcs; + return totalCpus; } /** */ public double currentCpuLoad() { - return load; + return curCpuLoad; } /** */ public double averageCpuLoad() { - return avgLoad; + return avgCpuLoad; } /** */ public double currentGcCpuLoad() { - return gcLoad; + return curGcCpuLoad; } /** */ @@ -1020,43 +1020,43 @@ public int totalNodes() { /** */ public long currentPmeDuration() { - return currentPmeDuration; + return curPmeDuration; } /** * Sets available processors. * - * @param availProcs Available processors. + * @param totalCpus Available processors. */ - public void totalCpus(int availProcs) { - this.availProcs = availProcs; + public void totalCpus(int totalCpus) { + this.totalCpus = totalCpus; } /** * Sets current CPU load. * - * @param load Current CPU load. + * @param curCpuLoad Current CPU load. */ - public void currentCpuLoad(double load) { - this.load = load; + public void currentCpuLoad(double curCpuLoad) { + this.curCpuLoad = curCpuLoad; } /** * Sets CPU load average over the metrics history. * - * @param avgLoad CPU load average. + * @param avgCpuLoad CPU load average. */ - public void averageCpuLoad(double avgLoad) { - this.avgLoad = avgLoad; + public void averageCpuLoad(double avgCpuLoad) { + this.avgCpuLoad = avgCpuLoad; } /** * Sets current GC load. * - * @param gcLoad Current GC load. + * @param curGcCpuLoad Current GC load. */ - public void currentGcCpuLoad(double gcLoad) { - this.gcLoad = gcLoad; + public void currentGcCpuLoad(double curGcCpuLoad) { + this.curGcCpuLoad = curGcCpuLoad; } /** @@ -1263,7 +1263,7 @@ public void totalNodes(int totalNodes) { * @param curPmeDuration Execution duration for current partition map exchange. */ public void currentPmeDuration(long curPmeDuration) { - this.currentPmeDuration = curPmeDuration; + this.curPmeDuration = curPmeDuration; } /** @@ -1308,36 +1308,36 @@ private static int cpuCnt(Map> neighborhood) { * @param neighborhood Cluster neighborhood. * @return CPU load. */ - private static int cpus(Map> neighborhood) { - int cpus = 0; + private static double currentCpuLoad(Map> neighborhood) { + double curCpuLoad = 0.0; for (Collection nodes : neighborhood.values()) { ClusterNode first = F.first(nodes); // Projection can be empty if all nodes in it failed. if (first != null) - cpus += first.metrics().getCurrentCpuLoad(); + curCpuLoad += first.metrics().getCurrentCpuLoad(); } - return cpus; + return curCpuLoad; } /** * @param neighborhood Cluster neighborhood. * @return GC CPU load. */ - private static int gcCpus(Map> neighborhood) { - int cpus = 0; + private static double currentGcCpuLoad(Map> neighborhood) { + double curGcCpuLoad = 0; for (Collection nodes : neighborhood.values()) { ClusterNode first = F.first(nodes); // Projection can be empty if all nodes in it failed. if (first != null) - cpus += first.metrics().getCurrentGcCpuLoad(); + curGcCpuLoad += first.metrics().getCurrentGcCpuLoad(); } - return cpus; + return curGcCpuLoad; } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 473ff1533578b..5e1b77fefb2e8 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -72,6 +72,7 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.NodeValidationFailedEvent; import org.apache.ignite.failure.FailureContext; +import org.apache.ignite.internal.ClusterMetricsSnapshot; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.IgniteInterruptedCheckedException; @@ -7580,7 +7581,7 @@ private void processClientMetricsUpdateMessage(TcpDiscoveryClientMetricsUpdateMe ClientMessageWorker wrk = clientMsgWorkers.get(msg.creatorNodeId()); if (wrk != null) - wrk.metrics(msg.metrics()); + wrk.metrics(new ClusterMetricsSnapshot(msg.metricsMessage())); else if (log.isDebugEnabled()) log.debug("Received client metrics update message from unknown client node: " + msg); } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java index b64cea9133710..00f7f69b8d4da 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java @@ -170,8 +170,8 @@ T readMessage() throws IgniteCheckedException, IOException { if (MESSAGE_SERIALIZATION != serMode) { detectSslAlert(serMode, in); - // There are many `X.hasCause` in the discovery errors processing which change connection recovery processing. - // It is better to throw an IOException on reading failures. Often happens at nodes stop and streams closing. + // IOException type is important for ServerImpl for connection error processing behavior. + // It may search the cause (X.hasCause). throw new IOException("Received unexpected byte while reading discovery message: " + serMode); } @@ -187,19 +187,48 @@ T readMessage() throws IgniteCheckedException, IOException { boolean finished; + byte[] unprocessedBytes = null; + int unprocessedBytesLen = 0; + do { // Should be cleared before first operation. msgBuf.clear(); - int read = in.read(msgBuf.array(), 0, msgBuf.limit()); + if (unprocessedBytes != null) { + assert unprocessedBytesLen == unprocessedBytes.length; + + msgBuf.put(unprocessedBytes); + + unprocessedBytes = null; + } + + int read = in.read(msgBuf.array(), msgBuf.position(), msgBuf.remaining()); if (read == -1) throw new EOFException("Connection closed before message was fully read."); - msgBuf.limit(read); + if (unprocessedBytesLen > 0) { + msgBuf.rewind(); + + msgBuf.limit(read + unprocessedBytesLen); + + unprocessedBytesLen = 0; + } + else + msgBuf.limit(read); finished = msgSer.readFrom(msg, msgReader); - } while (!finished); + + // We must keep the uprocessed bytes read from the socket. It won't return them again. + if (!finished && msgBuf.position() > 0 && msgBuf.remaining() > 0) { + unprocessedBytes = new byte[msgBuf.remaining()]; + + unprocessedBytesLen = unprocessedBytes.length; + + msgBuf.get(unprocessedBytes, 0, msgBuf.remaining()); + } + } + while (!finished); return (T)msg; } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java index 809cf9bbd798f..17083499ec434 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java @@ -289,10 +289,8 @@ public void addFailedNode(UUID nodeId) { /** * @param failedNodes Failed nodes. */ - public void failedNodes(@Nullable Collection failedNodes) { - this.failedNodes = failedNodes == null - ? null - : failedNodes instanceof Set ? (Set)failedNodes : new HashSet<>(failedNodes); + public void failedNodes(@Nullable Set failedNodes) { + this.failedNodes = failedNodes; } /** diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java index 8092ef3a7255f..144ada143e9df 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java @@ -19,20 +19,28 @@ import java.util.UUID; import org.apache.ignite.cluster.ClusterMetrics; -import org.apache.ignite.internal.ClusterMetricsSnapshot; +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; /** * Metrics update message. *

* Client sends his metrics in this message. */ -public class TcpDiscoveryClientMetricsUpdateMessage extends TcpDiscoveryAbstractMessage { +public class TcpDiscoveryClientMetricsUpdateMessage extends TcpDiscoveryAbstractMessage implements Message { /** */ private static final long serialVersionUID = 0L; /** */ - private final byte[] metrics; + @Order(value = 5, method = "metricsMessage") + private TcpDiscoveryClusterMetricsHolderMessage metricsMsg; + + /** Constructor for {@link DiscoveryMessageFactory}. */ + public TcpDiscoveryClientMetricsUpdateMessage() { + // No-op. + } /** * Constructor. @@ -43,16 +51,30 @@ public class TcpDiscoveryClientMetricsUpdateMessage extends TcpDiscoveryAbstract public TcpDiscoveryClientMetricsUpdateMessage(UUID creatorNodeId, ClusterMetrics metrics) { super(creatorNodeId); - this.metrics = ClusterMetricsSnapshot.serialize(metrics); + metricsMsg = new TcpDiscoveryClusterMetricsHolderMessage(metrics); + } + + /** + * Gets the metrics message. + * + * @return Metrics holder message. + */ + public TcpDiscoveryClusterMetricsHolderMessage metricsMessage() { + return metricsMsg; } /** - * Gets metrics map. + * Sets the metrics message. * - * @return Metrics map. + * @param metricsMsg Metrics holder message. */ - public ClusterMetrics metrics() { - return ClusterMetricsSnapshot.deserialize(metrics, 0); + public void metricsMessage(TcpDiscoveryClusterMetricsHolderMessage metricsMsg) { + this.metricsMsg = metricsMsg; + } + + /** {@inheritDoc} */ + @Override public short directType() { + return 11; } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClusterMetricsHolderMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClusterMetricsHolderMessage.java new file mode 100644 index 0000000000000..9252a4addbb8a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClusterMetricsHolderMessage.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp.messages; + +import org.apache.ignite.cluster.ClusterMetrics; +import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; +import org.apache.ignite.internal.processors.cluster.NodeMetricsMessage; +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * Utility container message of the node metrics. Is not a pure {@link TcpDiscoveryAbstractMessage}. + * Reuses Communication's {@link NodeMetricsMessage}. + */ +public class TcpDiscoveryClusterMetricsHolderMessage extends NodeMetricsMessage { + /** Constructor for {@link DiscoveryMessageFactory}. */ + public TcpDiscoveryClusterMetricsHolderMessage() { + // No-op. + } + + /** @param metrics Metrics. */ + public TcpDiscoveryClusterMetricsHolderMessage(ClusterMetrics metrics) { + super(metrics); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return -101; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TcpDiscoveryClusterMetricsHolderMessage.class, this, "super", super.toString()); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java index ffc1180890928..aec40df3a900d 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java @@ -53,6 +53,6 @@ public TcpDiscoveryNodeLeftMessage(UUID creatorNodeId) { /** */ @Override public short directType() { - return 6; + return 11; } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java index 28f3d7255db02..b29a8171fbce4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java @@ -264,7 +264,7 @@ public void testClientsReconnect() throws Exception { * @throws Exception If failed. */ @Test - public void testClientsRecocnnectDisabled() throws Exception { + public void testClientsReconnectDisabled() throws Exception { clientReconnectDisabled = true; Ignite srv1 = startGrid("server1"); From e911cb1d28f06e4468157dfb1a019f09fc3f7a38 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Thu, 1 Jan 2026 16:42:27 +0300 Subject: [PATCH 6/8] +master --- .../discovery/DiscoveryMessageFactory.java | 3 - .../cluster/NodeMetricsMessage.java | 104 +++++++++--------- .../tracing/messages/SpanContainer.java | 2 +- .../ignite/spi/discovery/tcp/ServerImpl.java | 3 +- .../discovery/tcp/TcpDiscoveryIoSession.java | 55 ++++----- .../TcpDiscoveryAbstractTraceableMessage.java | 2 +- ...cpDiscoveryClientMetricsUpdateMessage.java | 38 ++----- .../messages/TcpDiscoveryNodeLeftMessage.java | 14 +-- 8 files changed, 86 insertions(+), 135 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java index cf2ea37fe71b1..1c01592fd0dd5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java @@ -29,7 +29,6 @@ import org.apache.ignite.internal.codegen.TcpDiscoveryHandshakeRequestSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryHandshakeResponseSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryLoopbackProblemMessageSerializer; -import org.apache.ignite.internal.codegen.TcpDiscoveryNodeLeftMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryPingRequestSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryPingResponseSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryRingLatencyCheckMessageSerializer; @@ -47,7 +46,6 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeRequest; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeResponse; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryLoopbackProblemMessage; -import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeLeftMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingRequest; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryRingLatencyCheckMessage; @@ -72,6 +70,5 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider { factory.register((short)10, TcpDiscoveryHandshakeResponse::new, new TcpDiscoveryHandshakeResponseSerializer()); factory.register((short)11, TcpDiscoveryAuthFailedMessage::new, new TcpDiscoveryAuthFailedMessageSerializer()); factory.register((short)12, TcpDiscoveryDuplicateIdMessage::new, new TcpDiscoveryDuplicateIdMessageSerializer()); - factory.register((short)11, TcpDiscoveryNodeLeftMessage::new, new TcpDiscoveryNodeLeftMessageSerializer()); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/NodeMetricsMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/NodeMetricsMessage.java index 55b51cb041dba..f11ea1d48c0ba 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/NodeMetricsMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/NodeMetricsMessage.java @@ -137,20 +137,20 @@ public class NodeMetricsMessage implements Message { private long curIdleTime = -1; /** */ - @Order(value = 25) - private int totalCpus = -1; + @Order(value = 25, method = "totalCpus") + private int availProcs = -1; /** */ @Order(value = 26, method = "currentCpuLoad") - private double curCpuLoad = -1; + private double load = -1; /** */ @Order(value = 27, method = "averageCpuLoad") - private double avgCpuLoad = -1; + private double avgLoad = -1; /** */ @Order(value = 28, method = "currentGcCpuLoad") - private double curGcCpuLoad = -1; + private double gcLoad = -1; /** */ @Order(value = 29, method = "heapMemoryInitialized") @@ -173,15 +173,15 @@ public class NodeMetricsMessage implements Message { private long heapTotal = -1; /** */ - @Order(value = 34, method = "nonHeapMemoryInitialized") + @Order(value = 34, method = "heapMemoryInitialized") private long nonHeapInit = -1; /** */ - @Order(value = 35, method = "nonHeapMemoryUsed") + @Order(value = 35, method = "heapMemoryUsed") private long nonHeapUsed = -1; /** */ - @Order(value = 36, method = "nonHeapMemoryCommitted") + @Order(value = 36, method = "heapMemoryCommitted") private long nonHeapCommitted = -1; /** */ @@ -253,8 +253,8 @@ public class NodeMetricsMessage implements Message { private long totalJobsExecTime = -1; /** */ - @Order(value = 54, method = "currentPmeDuration") - private long curPmeDuration = -1; + @Order(value = 54) + private long currentPmeDuration = -1; /** */ public NodeMetricsMessage() { @@ -295,10 +295,10 @@ public NodeMetricsMessage(Collection nodes) { totalExecTasks = 0; totalIdleTime = 0; curIdleTime = 0; - totalCpus = 0; - curCpuLoad = 0; - avgCpuLoad = 0; - curGcCpuLoad = 0; + availProcs = 0; + load = 0; + avgLoad = 0; + gcLoad = 0; heapInit = 0; heapUsed = 0; heapCommitted = 0; @@ -323,7 +323,7 @@ public NodeMetricsMessage(Collection nodes) { outMesQueueSize = 0; heapTotal = 0; totalNodes = nodes.size(); - curPmeDuration = 0; + currentPmeDuration = 0; for (ClusterNode node : nodes) { ClusterMetrics m = node.metrics(); @@ -399,9 +399,9 @@ public NodeMetricsMessage(Collection nodes) { rcvdBytesCnt += m.getReceivedBytesCount(); outMesQueueSize += m.getOutboundMessagesQueueSize(); - avgCpuLoad += m.getCurrentCpuLoad(); + avgLoad += m.getCurrentCpuLoad(); - curPmeDuration = max(curPmeDuration, m.getCurrentPmeDuration()); + currentPmeDuration = max(currentPmeDuration, m.getCurrentPmeDuration()); } curJobExecTime /= size; @@ -412,7 +412,7 @@ public NodeMetricsMessage(Collection nodes) { avgWaitingJobs /= size; avgJobExecTime /= size; avgJobWaitTime /= size; - avgCpuLoad /= size; + avgLoad /= size; if (!F.isEmpty(nodes)) { ClusterMetrics oldestNodeMetrics = oldest(nodes).metrics(); @@ -423,9 +423,9 @@ public NodeMetricsMessage(Collection nodes) { Map> neighborhood = U.neighborhood(nodes); - curGcCpuLoad = currentGcCpuLoad(neighborhood); - curCpuLoad = currentCpuLoad(neighborhood); - totalCpus = cpuCnt(neighborhood); + gcLoad = gcCpus(neighborhood); + load = cpus(neighborhood); + availProcs = cpuCnt(neighborhood); } /** */ @@ -464,10 +464,10 @@ public NodeMetricsMessage(ClusterMetrics metrics) { curIdleTime = metrics.getCurrentIdleTime(); totalIdleTime = metrics.getTotalIdleTime(); - totalCpus = metrics.getTotalCpus(); - curCpuLoad = metrics.getCurrentCpuLoad(); - avgCpuLoad = metrics.getAverageCpuLoad(); - curGcCpuLoad = metrics.getCurrentGcCpuLoad(); + availProcs = metrics.getTotalCpus(); + load = metrics.getCurrentCpuLoad(); + avgLoad = metrics.getAverageCpuLoad(); + gcLoad = metrics.getCurrentGcCpuLoad(); heapInit = metrics.getHeapMemoryInitialized(); heapUsed = metrics.getHeapMemoryUsed(); @@ -487,7 +487,7 @@ public NodeMetricsMessage(ClusterMetrics metrics) { lastDataVer = metrics.getLastDataVersion(); - curPmeDuration = metrics.getCurrentPmeDuration(); + currentPmeDuration = metrics.getCurrentPmeDuration(); totalNodes = metrics.getTotalNodes(); @@ -885,22 +885,22 @@ public void currentIdleTime(long curIdleTime) { /** */ public int totalCpus() { - return totalCpus; + return availProcs; } /** */ public double currentCpuLoad() { - return curCpuLoad; + return load; } /** */ public double averageCpuLoad() { - return avgCpuLoad; + return avgLoad; } /** */ public double currentGcCpuLoad() { - return curGcCpuLoad; + return gcLoad; } /** */ @@ -1020,43 +1020,43 @@ public int totalNodes() { /** */ public long currentPmeDuration() { - return curPmeDuration; + return currentPmeDuration; } /** * Sets available processors. * - * @param totalCpus Available processors. + * @param availProcs Available processors. */ - public void totalCpus(int totalCpus) { - this.totalCpus = totalCpus; + public void totalCpus(int availProcs) { + this.availProcs = availProcs; } /** * Sets current CPU load. * - * @param curCpuLoad Current CPU load. + * @param load Current CPU load. */ - public void currentCpuLoad(double curCpuLoad) { - this.curCpuLoad = curCpuLoad; + public void currentCpuLoad(double load) { + this.load = load; } /** * Sets CPU load average over the metrics history. * - * @param avgCpuLoad CPU load average. + * @param avgLoad CPU load average. */ - public void averageCpuLoad(double avgCpuLoad) { - this.avgCpuLoad = avgCpuLoad; + public void averageCpuLoad(double avgLoad) { + this.avgLoad = avgLoad; } /** * Sets current GC load. * - * @param curGcCpuLoad Current GC load. + * @param gcLoad Current GC load. */ - public void currentGcCpuLoad(double curGcCpuLoad) { - this.curGcCpuLoad = curGcCpuLoad; + public void currentGcCpuLoad(double gcLoad) { + this.gcLoad = gcLoad; } /** @@ -1263,7 +1263,7 @@ public void totalNodes(int totalNodes) { * @param curPmeDuration Execution duration for current partition map exchange. */ public void currentPmeDuration(long curPmeDuration) { - this.curPmeDuration = curPmeDuration; + this.currentPmeDuration = curPmeDuration; } /** @@ -1308,36 +1308,36 @@ private static int cpuCnt(Map> neighborhood) { * @param neighborhood Cluster neighborhood. * @return CPU load. */ - private static double currentCpuLoad(Map> neighborhood) { - double curCpuLoad = 0.0; + private static int cpus(Map> neighborhood) { + int cpus = 0; for (Collection nodes : neighborhood.values()) { ClusterNode first = F.first(nodes); // Projection can be empty if all nodes in it failed. if (first != null) - curCpuLoad += first.metrics().getCurrentCpuLoad(); + cpus += first.metrics().getCurrentCpuLoad(); } - return curCpuLoad; + return cpus; } /** * @param neighborhood Cluster neighborhood. * @return GC CPU load. */ - private static double currentGcCpuLoad(Map> neighborhood) { - double curGcCpuLoad = 0; + private static int gcCpus(Map> neighborhood) { + int cpus = 0; for (Collection nodes : neighborhood.values()) { ClusterNode first = F.first(nodes); // Projection can be empty if all nodes in it failed. if (first != null) - curGcCpuLoad += first.metrics().getCurrentGcCpuLoad(); + cpus += first.metrics().getCurrentGcCpuLoad(); } - return curGcCpuLoad; + return cpus; } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/tracing/messages/SpanContainer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/tracing/messages/SpanContainer.java index dc60d51f0d13f..c1f73f47d2cee 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/tracing/messages/SpanContainer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/tracing/messages/SpanContainer.java @@ -46,7 +46,7 @@ public byte[] serializedSpanBytes() { * @param serializedSpan Serialized span. */ public void serializedSpanBytes(byte[] serializedSpan) { - serializedSpanBytes = serializedSpan.clone(); + this.serializedSpanBytes = serializedSpan.clone(); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 44ca6f0093f7c..aef67a57809d8 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -72,7 +72,6 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.NodeValidationFailedEvent; import org.apache.ignite.failure.FailureContext; -import org.apache.ignite.internal.ClusterMetricsSnapshot; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.IgniteInterruptedCheckedException; @@ -7581,7 +7580,7 @@ private void processClientMetricsUpdateMessage(TcpDiscoveryClientMetricsUpdateMe ClientMessageWorker wrk = clientMsgWorkers.get(msg.creatorNodeId()); if (wrk != null) - wrk.metrics(new ClusterMetricsSnapshot(msg.metricsMessage())); + wrk.metrics(msg.metrics()); else if (log.isDebugEnabled()) log.debug("Received client metrics update message from unknown client node: " + msg); } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java index 00f7f69b8d4da..92a20088bf4c5 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java @@ -170,15 +170,12 @@ T readMessage() throws IgniteCheckedException, IOException { if (MESSAGE_SERIALIZATION != serMode) { detectSslAlert(serMode, in); - // IOException type is important for ServerImpl for connection error processing behavior. - // It may search the cause (X.hasCause). + // IOException type is important for ServerImpl. It may search the cause (X.hasCause). + // The connection error processing behavior depends on it. throw new IOException("Received unexpected byte while reading discovery message: " + serMode); } - byte b0 = (byte)in.read(); - byte b1 = (byte)in.read(); - - Message msg = spi.messageFactory().create(makeMessageType(b0, b1)); + Message msg = spi.messageFactory().create(makeMessageType((byte)in.read(), (byte)in.read())); msgReader.reset(); msgReader.setBuffer(msgBuf); @@ -187,48 +184,40 @@ T readMessage() throws IgniteCheckedException, IOException { boolean finished; - byte[] unprocessedBytes = null; - int unprocessedBytesLen = 0; + msgBuf.clear(); do { - // Should be cleared before first operation. - msgBuf.clear(); - - if (unprocessedBytes != null) { - assert unprocessedBytesLen == unprocessedBytes.length; - - msgBuf.put(unprocessedBytes); - - unprocessedBytes = null; - } - int read = in.read(msgBuf.array(), msgBuf.position(), msgBuf.remaining()); if (read == -1) throw new EOFException("Connection closed before message was fully read."); - if (unprocessedBytesLen > 0) { - msgBuf.rewind(); + msgBuf.limit(msgBuf.position() > 0 ? msgBuf.position() + read + 1 : read); - msgBuf.limit(read + unprocessedBytesLen); + finished = msgSer.readFrom(msg, msgReader); - unprocessedBytesLen = 0; - } - else - msgBuf.limit(read); + // We rely on the fact that Discovery only sends next message upon receiving a receipt for the previous one. + // This behaviour guarantees that we never read a next message from the buffer right after the end of + // the previous message. + assert msgBuf.remaining() == 0 || !finished : "Some data was read from the socket but left unprocessed."; - finished = msgSer.readFrom(msg, msgReader); + if (finished) + break; // We must keep the uprocessed bytes read from the socket. It won't return them again. - if (!finished && msgBuf.position() > 0 && msgBuf.remaining() > 0) { - unprocessedBytes = new byte[msgBuf.remaining()]; - - unprocessedBytesLen = unprocessedBytes.length; + byte[] unprocessedTail = null; - msgBuf.get(unprocessedBytes, 0, msgBuf.remaining()); + if (msgBuf.remaining() > 0) { + unprocessedTail = new byte[msgBuf.remaining()]; + msgBuf.get(unprocessedTail, 0, msgBuf.remaining()); } + + msgBuf.clear(); + + if (unprocessedTail != null) + msgBuf.put(unprocessedTail); } - while (!finished); + while (true); return (T)msg; } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractTraceableMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractTraceableMessage.java index 3df159a48e806..c76fc8694d9b9 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractTraceableMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractTraceableMessage.java @@ -57,7 +57,7 @@ protected TcpDiscoveryAbstractTraceableMessage(UUID creatorNodeId) { protected TcpDiscoveryAbstractTraceableMessage(TcpDiscoveryAbstractTraceableMessage msg) { super(msg); - spanContainer = msg.spanContainer; + this.spanContainer = msg.spanContainer; } /** diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java index 144ada143e9df..8092ef3a7255f 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java @@ -19,28 +19,20 @@ import java.util.UUID; import org.apache.ignite.cluster.ClusterMetrics; -import org.apache.ignite.internal.Order; -import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; +import org.apache.ignite.internal.ClusterMetricsSnapshot; import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.plugin.extensions.communication.Message; /** * Metrics update message. *

* Client sends his metrics in this message. */ -public class TcpDiscoveryClientMetricsUpdateMessage extends TcpDiscoveryAbstractMessage implements Message { +public class TcpDiscoveryClientMetricsUpdateMessage extends TcpDiscoveryAbstractMessage { /** */ private static final long serialVersionUID = 0L; /** */ - @Order(value = 5, method = "metricsMessage") - private TcpDiscoveryClusterMetricsHolderMessage metricsMsg; - - /** Constructor for {@link DiscoveryMessageFactory}. */ - public TcpDiscoveryClientMetricsUpdateMessage() { - // No-op. - } + private final byte[] metrics; /** * Constructor. @@ -51,30 +43,16 @@ public TcpDiscoveryClientMetricsUpdateMessage() { public TcpDiscoveryClientMetricsUpdateMessage(UUID creatorNodeId, ClusterMetrics metrics) { super(creatorNodeId); - metricsMsg = new TcpDiscoveryClusterMetricsHolderMessage(metrics); - } - - /** - * Gets the metrics message. - * - * @return Metrics holder message. - */ - public TcpDiscoveryClusterMetricsHolderMessage metricsMessage() { - return metricsMsg; + this.metrics = ClusterMetricsSnapshot.serialize(metrics); } /** - * Sets the metrics message. + * Gets metrics map. * - * @param metricsMsg Metrics holder message. + * @return Metrics map. */ - public void metricsMessage(TcpDiscoveryClusterMetricsHolderMessage metricsMsg) { - this.metricsMsg = metricsMsg; - } - - /** {@inheritDoc} */ - @Override public short directType() { - return 11; + public ClusterMetrics metrics() { + return ClusterMetricsSnapshot.deserialize(metrics, 0); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java index aec40df3a900d..5d6df69b715b7 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java @@ -18,9 +18,7 @@ package org.apache.ignite.spi.discovery.tcp.messages; import java.util.UUID; -import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.plugin.extensions.communication.Message; /** * Sent by node that is stopping to coordinator across the ring, @@ -28,15 +26,10 @@ */ @TcpDiscoveryEnsureDelivery @TcpDiscoveryRedirectToClient -public class TcpDiscoveryNodeLeftMessage extends TcpDiscoveryAbstractTraceableMessage implements Message { +public class TcpDiscoveryNodeLeftMessage extends TcpDiscoveryAbstractTraceableMessage { /** */ private static final long serialVersionUID = 0L; - /** Default constructor for {@link DiscoveryMessageFactory}. */ - public TcpDiscoveryNodeLeftMessage() { - // No-op. - } - /** * Constructor. * @@ -50,9 +43,4 @@ public TcpDiscoveryNodeLeftMessage(UUID creatorNodeId) { @Override public String toString() { return S.toString(TcpDiscoveryNodeLeftMessage.class, this, "super", super.toString()); } - - /** */ - @Override public short directType() { - return 11; - } } From c86614457cf213b1dc75622e237a4c78cebf68d7 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Sat, 3 Jan 2026 02:01:12 +0300 Subject: [PATCH 7/8] message reading fix --- .../ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java index 92a20088bf4c5..edcab473b4c91 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java @@ -192,7 +192,14 @@ T readMessage() throws IgniteCheckedException, IOException { if (read == -1) throw new EOFException("Connection closed before message was fully read."); - msgBuf.limit(msgBuf.position() > 0 ? msgBuf.position() + read + 1 : read); + if (msgBuf.position() > 0) { + msgBuf.limit(msgBuf.position() + read); + + // We've stored an unprocessed tail before. + msgBuf.rewind(); + } + else + msgBuf.limit(read); finished = msgSer.readFrom(msg, msgReader); From fe4210b416cc0713344851b7a19987001f37654c Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Sat, 3 Jan 2026 02:19:05 +0300 Subject: [PATCH 8/8] + TcpDiscoveryNodeLeftMessage --- .../discovery/DiscoveryMessageFactory.java | 3 +++ .../tcp/messages/TcpDiscoveryNodeLeftMessage.java | 14 +++++++++++++- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java index 1c01592fd0dd5..3a2844af1ac57 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java @@ -29,6 +29,7 @@ import org.apache.ignite.internal.codegen.TcpDiscoveryHandshakeRequestSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryHandshakeResponseSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryLoopbackProblemMessageSerializer; +import org.apache.ignite.internal.codegen.TcpDiscoveryNodeLeftMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryPingRequestSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryPingResponseSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryRingLatencyCheckMessageSerializer; @@ -46,6 +47,7 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeRequest; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeResponse; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryLoopbackProblemMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeLeftMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingRequest; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryRingLatencyCheckMessage; @@ -70,5 +72,6 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider { factory.register((short)10, TcpDiscoveryHandshakeResponse::new, new TcpDiscoveryHandshakeResponseSerializer()); factory.register((short)11, TcpDiscoveryAuthFailedMessage::new, new TcpDiscoveryAuthFailedMessageSerializer()); factory.register((short)12, TcpDiscoveryDuplicateIdMessage::new, new TcpDiscoveryDuplicateIdMessageSerializer()); + factory.register((short)13, TcpDiscoveryNodeLeftMessage::new, new TcpDiscoveryNodeLeftMessageSerializer()); } } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java index 5d6df69b715b7..b6ec6ff137d01 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java @@ -18,7 +18,9 @@ package org.apache.ignite.spi.discovery.tcp.messages; import java.util.UUID; +import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; /** * Sent by node that is stopping to coordinator across the ring, @@ -26,10 +28,15 @@ */ @TcpDiscoveryEnsureDelivery @TcpDiscoveryRedirectToClient -public class TcpDiscoveryNodeLeftMessage extends TcpDiscoveryAbstractTraceableMessage { +public class TcpDiscoveryNodeLeftMessage extends TcpDiscoveryAbstractTraceableMessage implements Message { /** */ private static final long serialVersionUID = 0L; + /** Constructor for {@link DiscoveryMessageFactory}. */ + public TcpDiscoveryNodeLeftMessage() { + // No-op. + } + /** * Constructor. * @@ -39,6 +46,11 @@ public TcpDiscoveryNodeLeftMessage(UUID creatorNodeId) { super(creatorNodeId); } + /** {@inheritDoc} */ + @Override public short directType() { + return 13; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(TcpDiscoveryNodeLeftMessage.class, this, "super", super.toString());