diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java index 1c01592fd0dd5..5f399ed0c46fd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java @@ -21,6 +21,7 @@ import org.apache.ignite.internal.codegen.InetSocketAddressMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryAuthFailedMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryCheckFailedMessageSerializer; +import org.apache.ignite.internal.codegen.TcpDiscoveryClientMetricsUpdateMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryClientPingRequestSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryClientPingResponseSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryConnectionCheckMessageSerializer; @@ -29,6 +30,7 @@ import org.apache.ignite.internal.codegen.TcpDiscoveryHandshakeRequestSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryHandshakeResponseSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryLoopbackProblemMessageSerializer; +import org.apache.ignite.internal.codegen.TcpDiscoveryNodeMetricsMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryPingRequestSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryPingResponseSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryRingLatencyCheckMessageSerializer; @@ -38,6 +40,7 @@ import org.apache.ignite.spi.discovery.tcp.messages.InetSocketAddressMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAuthFailedMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCheckFailedMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientMetricsUpdateMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingRequest; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingResponse; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryConnectionCheckMessage; @@ -46,6 +49,7 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeRequest; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeResponse; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryLoopbackProblemMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeMetricsMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingRequest; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryRingLatencyCheckMessage; @@ -54,6 +58,7 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider { /** {@inheritDoc} */ @Override public void registerAll(MessageFactory factory) { + factory.register((short)-102, TcpDiscoveryNodeMetricsMessage::new, new TcpDiscoveryNodeMetricsMessageSerializer()); factory.register((short)-101, InetSocketAddressMessage::new, new InetSocketAddressMessageSerializer()); factory.register((short)-100, InetAddressMessage::new, new InetAddressMessageSerializer()); @@ -70,5 +75,6 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider { factory.register((short)10, TcpDiscoveryHandshakeResponse::new, new TcpDiscoveryHandshakeResponseSerializer()); factory.register((short)11, TcpDiscoveryAuthFailedMessage::new, new TcpDiscoveryAuthFailedMessageSerializer()); factory.register((short)12, TcpDiscoveryDuplicateIdMessage::new, new TcpDiscoveryDuplicateIdMessageSerializer()); + factory.register((short)13, TcpDiscoveryClientMetricsUpdateMessage::new, new TcpDiscoveryClientMetricsUpdateMessageSerializer()); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/NodeMetricsMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/NodeMetricsMessage.java index f11ea1d48c0ba..55b51cb041dba 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/NodeMetricsMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/NodeMetricsMessage.java @@ -137,20 +137,20 @@ public class NodeMetricsMessage implements Message { private long curIdleTime = -1; /** */ - @Order(value = 25, method = "totalCpus") - private int availProcs = -1; + @Order(value = 25) + private int totalCpus = -1; /** */ @Order(value = 26, method = "currentCpuLoad") - private double load = -1; + private double curCpuLoad = -1; /** */ @Order(value = 27, method = "averageCpuLoad") - private double avgLoad = -1; + private double avgCpuLoad = -1; /** */ @Order(value = 28, method = "currentGcCpuLoad") - private double gcLoad = -1; + private double curGcCpuLoad = -1; /** */ @Order(value = 29, method = "heapMemoryInitialized") @@ -173,15 +173,15 @@ public class NodeMetricsMessage implements Message { private long heapTotal = -1; /** */ - @Order(value = 34, method = "heapMemoryInitialized") + @Order(value = 34, method = "nonHeapMemoryInitialized") private long nonHeapInit = -1; /** */ - @Order(value = 35, method = "heapMemoryUsed") + @Order(value = 35, method = "nonHeapMemoryUsed") private long nonHeapUsed = -1; /** */ - @Order(value = 36, method = "heapMemoryCommitted") + @Order(value = 36, method = "nonHeapMemoryCommitted") private long nonHeapCommitted = -1; /** */ @@ -253,8 +253,8 @@ public class NodeMetricsMessage implements Message { private long totalJobsExecTime = -1; /** */ - @Order(value = 54) - private long currentPmeDuration = -1; + @Order(value = 54, method = "currentPmeDuration") + private long curPmeDuration = -1; /** */ public NodeMetricsMessage() { @@ -295,10 +295,10 @@ public NodeMetricsMessage(Collection nodes) { totalExecTasks = 0; totalIdleTime = 0; curIdleTime = 0; - availProcs = 0; - load = 0; - avgLoad = 0; - gcLoad = 0; + totalCpus = 0; + curCpuLoad = 0; + avgCpuLoad = 0; + curGcCpuLoad = 0; heapInit = 0; heapUsed = 0; heapCommitted = 0; @@ -323,7 +323,7 @@ public NodeMetricsMessage(Collection nodes) { outMesQueueSize = 0; heapTotal = 0; totalNodes = nodes.size(); - currentPmeDuration = 0; + curPmeDuration = 0; for (ClusterNode node : nodes) { ClusterMetrics m = node.metrics(); @@ -399,9 +399,9 @@ public NodeMetricsMessage(Collection nodes) { rcvdBytesCnt += m.getReceivedBytesCount(); outMesQueueSize += m.getOutboundMessagesQueueSize(); - avgLoad += m.getCurrentCpuLoad(); + avgCpuLoad += m.getCurrentCpuLoad(); - currentPmeDuration = max(currentPmeDuration, m.getCurrentPmeDuration()); + curPmeDuration = max(curPmeDuration, m.getCurrentPmeDuration()); } curJobExecTime /= size; @@ -412,7 +412,7 @@ public NodeMetricsMessage(Collection nodes) { avgWaitingJobs /= size; avgJobExecTime /= size; avgJobWaitTime /= size; - avgLoad /= size; + avgCpuLoad /= size; if (!F.isEmpty(nodes)) { ClusterMetrics oldestNodeMetrics = oldest(nodes).metrics(); @@ -423,9 +423,9 @@ public NodeMetricsMessage(Collection nodes) { Map> neighborhood = U.neighborhood(nodes); - gcLoad = gcCpus(neighborhood); - load = cpus(neighborhood); - availProcs = cpuCnt(neighborhood); + curGcCpuLoad = currentGcCpuLoad(neighborhood); + curCpuLoad = currentCpuLoad(neighborhood); + totalCpus = cpuCnt(neighborhood); } /** */ @@ -464,10 +464,10 @@ public NodeMetricsMessage(ClusterMetrics metrics) { curIdleTime = metrics.getCurrentIdleTime(); totalIdleTime = metrics.getTotalIdleTime(); - availProcs = metrics.getTotalCpus(); - load = metrics.getCurrentCpuLoad(); - avgLoad = metrics.getAverageCpuLoad(); - gcLoad = metrics.getCurrentGcCpuLoad(); + totalCpus = metrics.getTotalCpus(); + curCpuLoad = metrics.getCurrentCpuLoad(); + avgCpuLoad = metrics.getAverageCpuLoad(); + curGcCpuLoad = metrics.getCurrentGcCpuLoad(); heapInit = metrics.getHeapMemoryInitialized(); heapUsed = metrics.getHeapMemoryUsed(); @@ -487,7 +487,7 @@ public NodeMetricsMessage(ClusterMetrics metrics) { lastDataVer = metrics.getLastDataVersion(); - currentPmeDuration = metrics.getCurrentPmeDuration(); + curPmeDuration = metrics.getCurrentPmeDuration(); totalNodes = metrics.getTotalNodes(); @@ -885,22 +885,22 @@ public void currentIdleTime(long curIdleTime) { /** */ public int totalCpus() { - return availProcs; + return totalCpus; } /** */ public double currentCpuLoad() { - return load; + return curCpuLoad; } /** */ public double averageCpuLoad() { - return avgLoad; + return avgCpuLoad; } /** */ public double currentGcCpuLoad() { - return gcLoad; + return curGcCpuLoad; } /** */ @@ -1020,43 +1020,43 @@ public int totalNodes() { /** */ public long currentPmeDuration() { - return currentPmeDuration; + return curPmeDuration; } /** * Sets available processors. * - * @param availProcs Available processors. + * @param totalCpus Available processors. */ - public void totalCpus(int availProcs) { - this.availProcs = availProcs; + public void totalCpus(int totalCpus) { + this.totalCpus = totalCpus; } /** * Sets current CPU load. * - * @param load Current CPU load. + * @param curCpuLoad Current CPU load. */ - public void currentCpuLoad(double load) { - this.load = load; + public void currentCpuLoad(double curCpuLoad) { + this.curCpuLoad = curCpuLoad; } /** * Sets CPU load average over the metrics history. * - * @param avgLoad CPU load average. + * @param avgCpuLoad CPU load average. */ - public void averageCpuLoad(double avgLoad) { - this.avgLoad = avgLoad; + public void averageCpuLoad(double avgCpuLoad) { + this.avgCpuLoad = avgCpuLoad; } /** * Sets current GC load. * - * @param gcLoad Current GC load. + * @param curGcCpuLoad Current GC load. */ - public void currentGcCpuLoad(double gcLoad) { - this.gcLoad = gcLoad; + public void currentGcCpuLoad(double curGcCpuLoad) { + this.curGcCpuLoad = curGcCpuLoad; } /** @@ -1263,7 +1263,7 @@ public void totalNodes(int totalNodes) { * @param curPmeDuration Execution duration for current partition map exchange. */ public void currentPmeDuration(long curPmeDuration) { - this.currentPmeDuration = curPmeDuration; + this.curPmeDuration = curPmeDuration; } /** @@ -1308,36 +1308,36 @@ private static int cpuCnt(Map> neighborhood) { * @param neighborhood Cluster neighborhood. * @return CPU load. */ - private static int cpus(Map> neighborhood) { - int cpus = 0; + private static double currentCpuLoad(Map> neighborhood) { + double curCpuLoad = 0.0; for (Collection nodes : neighborhood.values()) { ClusterNode first = F.first(nodes); // Projection can be empty if all nodes in it failed. if (first != null) - cpus += first.metrics().getCurrentCpuLoad(); + curCpuLoad += first.metrics().getCurrentCpuLoad(); } - return cpus; + return curCpuLoad; } /** * @param neighborhood Cluster neighborhood. * @return GC CPU load. */ - private static int gcCpus(Map> neighborhood) { - int cpus = 0; + private static double currentGcCpuLoad(Map> neighborhood) { + double curGcCpuLoad = 0; for (Collection nodes : neighborhood.values()) { ClusterNode first = F.first(nodes); // Projection can be empty if all nodes in it failed. if (first != null) - cpus += first.metrics().getCurrentGcCpuLoad(); + curGcCpuLoad += first.metrics().getCurrentGcCpuLoad(); } - return cpus; + return curGcCpuLoad; } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index aef67a57809d8..44ca6f0093f7c 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -72,6 +72,7 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.NodeValidationFailedEvent; import org.apache.ignite.failure.FailureContext; +import org.apache.ignite.internal.ClusterMetricsSnapshot; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.IgniteInterruptedCheckedException; @@ -7580,7 +7581,7 @@ private void processClientMetricsUpdateMessage(TcpDiscoveryClientMetricsUpdateMe ClientMessageWorker wrk = clientMsgWorkers.get(msg.creatorNodeId()); if (wrk != null) - wrk.metrics(msg.metrics()); + wrk.metrics(new ClusterMetricsSnapshot(msg.metricsMessage())); else if (log.isDebugEnabled()) log.debug("Received client metrics update message from unknown client node: " + msg); } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java index 5c1946f0eac34..edcab473b4c91 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java @@ -170,13 +170,12 @@ T readMessage() throws IgniteCheckedException, IOException { if (MESSAGE_SERIALIZATION != serMode) { detectSslAlert(serMode, in); - throw new IgniteCheckedException("Received unexpected byte while reading discovery message: " + serMode); + // IOException type is important for ServerImpl. It may search the cause (X.hasCause). + // The connection error processing behavior depends on it. + throw new IOException("Received unexpected byte while reading discovery message: " + serMode); } - byte b0 = (byte)in.read(); - byte b1 = (byte)in.read(); - - Message msg = spi.messageFactory().create(makeMessageType(b0, b1)); + Message msg = spi.messageFactory().create(makeMessageType((byte)in.read(), (byte)in.read())); msgReader.reset(); msgReader.setBuffer(msgBuf); @@ -185,19 +184,47 @@ T readMessage() throws IgniteCheckedException, IOException { boolean finished; - do { - // Should be cleared before first operation. - msgBuf.clear(); + msgBuf.clear(); - int read = in.read(msgBuf.array(), 0, msgBuf.limit()); + do { + int read = in.read(msgBuf.array(), msgBuf.position(), msgBuf.remaining()); if (read == -1) throw new EOFException("Connection closed before message was fully read."); - msgBuf.limit(read); + if (msgBuf.position() > 0) { + msgBuf.limit(msgBuf.position() + read); + + // We've stored an unprocessed tail before. + msgBuf.rewind(); + } + else + msgBuf.limit(read); finished = msgSer.readFrom(msg, msgReader); - } while (!finished); + + // We rely on the fact that Discovery only sends next message upon receiving a receipt for the previous one. + // This behaviour guarantees that we never read a next message from the buffer right after the end of + // the previous message. + assert msgBuf.remaining() == 0 || !finished : "Some data was read from the socket but left unprocessed."; + + if (finished) + break; + + // We must keep the uprocessed bytes read from the socket. It won't return them again. + byte[] unprocessedTail = null; + + if (msgBuf.remaining() > 0) { + unprocessedTail = new byte[msgBuf.remaining()]; + msgBuf.get(unprocessedTail, 0, msgBuf.remaining()); + } + + msgBuf.clear(); + + if (unprocessedTail != null) + msgBuf.put(unprocessedTail); + } + while (true); return (T)msg; } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java index 8092ef3a7255f..f110c40e8cb92 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java @@ -19,20 +19,28 @@ import java.util.UUID; import org.apache.ignite.cluster.ClusterMetrics; -import org.apache.ignite.internal.ClusterMetricsSnapshot; +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; /** * Metrics update message. *

* Client sends his metrics in this message. */ -public class TcpDiscoveryClientMetricsUpdateMessage extends TcpDiscoveryAbstractMessage { +public class TcpDiscoveryClientMetricsUpdateMessage extends TcpDiscoveryAbstractMessage implements Message { /** */ private static final long serialVersionUID = 0L; /** */ - private final byte[] metrics; + @Order(value = 5, method = "metricsMessage") + private TcpDiscoveryNodeMetricsMessage metricsMsg; + + /** Constructor for {@link DiscoveryMessageFactory}. */ + public TcpDiscoveryClientMetricsUpdateMessage() { + // No-op. + } /** * Constructor. @@ -43,16 +51,30 @@ public class TcpDiscoveryClientMetricsUpdateMessage extends TcpDiscoveryAbstract public TcpDiscoveryClientMetricsUpdateMessage(UUID creatorNodeId, ClusterMetrics metrics) { super(creatorNodeId); - this.metrics = ClusterMetricsSnapshot.serialize(metrics); + metricsMsg = new TcpDiscoveryNodeMetricsMessage(metrics); + } + + /** + * Gets the metrics message. + * + * @return Metrics holder message. + */ + public TcpDiscoveryNodeMetricsMessage metricsMessage() { + return metricsMsg; } /** - * Gets metrics map. + * Sets the metrics message. * - * @return Metrics map. + * @param metricsMsg Metrics holder message. */ - public ClusterMetrics metrics() { - return ClusterMetricsSnapshot.deserialize(metrics, 0); + public void metricsMessage(TcpDiscoveryNodeMetricsMessage metricsMsg) { + this.metricsMsg = metricsMsg; + } + + /** {@inheritDoc} */ + @Override public short directType() { + return 13; } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeMetricsMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeMetricsMessage.java new file mode 100644 index 0000000000000..0c12acee04a2b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeMetricsMessage.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp.messages; + +import org.apache.ignite.cluster.ClusterMetrics; +import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; +import org.apache.ignite.internal.processors.cluster.NodeMetricsMessage; +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * We cannot directly reuse `NodeMetricsMessage` in Discovery as it is registered in a message factory of Communication + * component and thus is unavailable in Discovery. We have to extend `NodeMetricsMessage` and register this subclass in + * message factory of Discovery component. + */ +public class TcpDiscoveryNodeMetricsMessage extends NodeMetricsMessage { + /** Constructor for {@link DiscoveryMessageFactory}. */ + public TcpDiscoveryNodeMetricsMessage() { + // No-op. + } + + /** @param metrics Metrics. */ + public TcpDiscoveryNodeMetricsMessage(ClusterMetrics metrics) { + super(metrics); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return -102; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TcpDiscoveryNodeMetricsMessage.class, this, "super", super.toString()); + } +}