Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.ignite.internal.codegen.InetSocketAddressMessageSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryAuthFailedMessageSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryCheckFailedMessageSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryClientMetricsUpdateMessageSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryClientPingRequestSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryClientPingResponseSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryConnectionCheckMessageSerializer;
Expand All @@ -29,6 +30,7 @@
import org.apache.ignite.internal.codegen.TcpDiscoveryHandshakeRequestSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryHandshakeResponseSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryLoopbackProblemMessageSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryNodeMetricsMessageSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryPingRequestSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryPingResponseSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryRingLatencyCheckMessageSerializer;
Expand All @@ -38,6 +40,7 @@
import org.apache.ignite.spi.discovery.tcp.messages.InetSocketAddressMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAuthFailedMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCheckFailedMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientMetricsUpdateMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingRequest;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingResponse;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryConnectionCheckMessage;
Expand All @@ -46,6 +49,7 @@
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeRequest;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeResponse;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryLoopbackProblemMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeMetricsMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingRequest;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryRingLatencyCheckMessage;
Expand All @@ -54,6 +58,7 @@
public class DiscoveryMessageFactory implements MessageFactoryProvider {
/** {@inheritDoc} */
@Override public void registerAll(MessageFactory factory) {
factory.register((short)-102, TcpDiscoveryNodeMetricsMessage::new, new TcpDiscoveryNodeMetricsMessageSerializer());
factory.register((short)-101, InetSocketAddressMessage::new, new InetSocketAddressMessageSerializer());
factory.register((short)-100, InetAddressMessage::new, new InetAddressMessageSerializer());

Expand All @@ -70,5 +75,6 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider {
factory.register((short)10, TcpDiscoveryHandshakeResponse::new, new TcpDiscoveryHandshakeResponseSerializer());
factory.register((short)11, TcpDiscoveryAuthFailedMessage::new, new TcpDiscoveryAuthFailedMessageSerializer());
factory.register((short)12, TcpDiscoveryDuplicateIdMessage::new, new TcpDiscoveryDuplicateIdMessageSerializer());
factory.register((short)13, TcpDiscoveryClientMetricsUpdateMessage::new, new TcpDiscoveryClientMetricsUpdateMessageSerializer());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,20 +137,20 @@ public class NodeMetricsMessage implements Message {
private long curIdleTime = -1;

/** */
@Order(value = 25, method = "totalCpus")
private int availProcs = -1;
@Order(value = 25)
private int totalCpus = -1;

/** */
@Order(value = 26, method = "currentCpuLoad")
private double load = -1;
private double curCpuLoad = -1;

/** */
@Order(value = 27, method = "averageCpuLoad")
private double avgLoad = -1;
private double avgCpuLoad = -1;

/** */
@Order(value = 28, method = "currentGcCpuLoad")
private double gcLoad = -1;
private double curGcCpuLoad = -1;

/** */
@Order(value = 29, method = "heapMemoryInitialized")
Expand All @@ -173,15 +173,15 @@ public class NodeMetricsMessage implements Message {
private long heapTotal = -1;

/** */
@Order(value = 34, method = "heapMemoryInitialized")
@Order(value = 34, method = "nonHeapMemoryInitialized")
private long nonHeapInit = -1;

/** */
@Order(value = 35, method = "heapMemoryUsed")
@Order(value = 35, method = "nonHeapMemoryUsed")
private long nonHeapUsed = -1;

/** */
@Order(value = 36, method = "heapMemoryCommitted")
@Order(value = 36, method = "nonHeapMemoryCommitted")
private long nonHeapCommitted = -1;

/** */
Expand Down Expand Up @@ -253,8 +253,8 @@ public class NodeMetricsMessage implements Message {
private long totalJobsExecTime = -1;

/** */
@Order(value = 54)
private long currentPmeDuration = -1;
@Order(value = 54, method = "currentPmeDuration")
private long curPmeDuration = -1;

/** */
public NodeMetricsMessage() {
Expand Down Expand Up @@ -295,10 +295,10 @@ public NodeMetricsMessage(Collection<ClusterNode> nodes) {
totalExecTasks = 0;
totalIdleTime = 0;
curIdleTime = 0;
availProcs = 0;
load = 0;
avgLoad = 0;
gcLoad = 0;
totalCpus = 0;
curCpuLoad = 0;
avgCpuLoad = 0;
curGcCpuLoad = 0;
heapInit = 0;
heapUsed = 0;
heapCommitted = 0;
Expand All @@ -323,7 +323,7 @@ public NodeMetricsMessage(Collection<ClusterNode> nodes) {
outMesQueueSize = 0;
heapTotal = 0;
totalNodes = nodes.size();
currentPmeDuration = 0;
curPmeDuration = 0;

for (ClusterNode node : nodes) {
ClusterMetrics m = node.metrics();
Expand Down Expand Up @@ -399,9 +399,9 @@ public NodeMetricsMessage(Collection<ClusterNode> nodes) {
rcvdBytesCnt += m.getReceivedBytesCount();
outMesQueueSize += m.getOutboundMessagesQueueSize();

avgLoad += m.getCurrentCpuLoad();
avgCpuLoad += m.getCurrentCpuLoad();

currentPmeDuration = max(currentPmeDuration, m.getCurrentPmeDuration());
curPmeDuration = max(curPmeDuration, m.getCurrentPmeDuration());
}

curJobExecTime /= size;
Expand All @@ -412,7 +412,7 @@ public NodeMetricsMessage(Collection<ClusterNode> nodes) {
avgWaitingJobs /= size;
avgJobExecTime /= size;
avgJobWaitTime /= size;
avgLoad /= size;
avgCpuLoad /= size;

if (!F.isEmpty(nodes)) {
ClusterMetrics oldestNodeMetrics = oldest(nodes).metrics();
Expand All @@ -423,9 +423,9 @@ public NodeMetricsMessage(Collection<ClusterNode> nodes) {

Map<String, Collection<ClusterNode>> neighborhood = U.neighborhood(nodes);

gcLoad = gcCpus(neighborhood);
load = cpus(neighborhood);
availProcs = cpuCnt(neighborhood);
curGcCpuLoad = currentGcCpuLoad(neighborhood);
curCpuLoad = currentCpuLoad(neighborhood);
totalCpus = cpuCnt(neighborhood);
}

/** */
Expand Down Expand Up @@ -464,10 +464,10 @@ public NodeMetricsMessage(ClusterMetrics metrics) {
curIdleTime = metrics.getCurrentIdleTime();
totalIdleTime = metrics.getTotalIdleTime();

availProcs = metrics.getTotalCpus();
load = metrics.getCurrentCpuLoad();
avgLoad = metrics.getAverageCpuLoad();
gcLoad = metrics.getCurrentGcCpuLoad();
totalCpus = metrics.getTotalCpus();
curCpuLoad = metrics.getCurrentCpuLoad();
avgCpuLoad = metrics.getAverageCpuLoad();
curGcCpuLoad = metrics.getCurrentGcCpuLoad();

heapInit = metrics.getHeapMemoryInitialized();
heapUsed = metrics.getHeapMemoryUsed();
Expand All @@ -487,7 +487,7 @@ public NodeMetricsMessage(ClusterMetrics metrics) {

lastDataVer = metrics.getLastDataVersion();

currentPmeDuration = metrics.getCurrentPmeDuration();
curPmeDuration = metrics.getCurrentPmeDuration();

totalNodes = metrics.getTotalNodes();

Expand Down Expand Up @@ -885,22 +885,22 @@ public void currentIdleTime(long curIdleTime) {

/** */
public int totalCpus() {
return availProcs;
return totalCpus;
}

/** */
public double currentCpuLoad() {
return load;
return curCpuLoad;
}

/** */
public double averageCpuLoad() {
return avgLoad;
return avgCpuLoad;
}

/** */
public double currentGcCpuLoad() {
return gcLoad;
return curGcCpuLoad;
}

/** */
Expand Down Expand Up @@ -1020,43 +1020,43 @@ public int totalNodes() {

/** */
public long currentPmeDuration() {
return currentPmeDuration;
return curPmeDuration;
}

/**
* Sets available processors.
*
* @param availProcs Available processors.
* @param totalCpus Available processors.
*/
public void totalCpus(int availProcs) {
this.availProcs = availProcs;
public void totalCpus(int totalCpus) {
this.totalCpus = totalCpus;
}

/**
* Sets current CPU load.
*
* @param load Current CPU load.
* @param curCpuLoad Current CPU load.
*/
public void currentCpuLoad(double load) {
this.load = load;
public void currentCpuLoad(double curCpuLoad) {
this.curCpuLoad = curCpuLoad;
}

/**
* Sets CPU load average over the metrics history.
*
* @param avgLoad CPU load average.
* @param avgCpuLoad CPU load average.
*/
public void averageCpuLoad(double avgLoad) {
this.avgLoad = avgLoad;
public void averageCpuLoad(double avgCpuLoad) {
this.avgCpuLoad = avgCpuLoad;
}

/**
* Sets current GC load.
*
* @param gcLoad Current GC load.
* @param curGcCpuLoad Current GC load.
*/
public void currentGcCpuLoad(double gcLoad) {
this.gcLoad = gcLoad;
public void currentGcCpuLoad(double curGcCpuLoad) {
this.curGcCpuLoad = curGcCpuLoad;
}

/**
Expand Down Expand Up @@ -1263,7 +1263,7 @@ public void totalNodes(int totalNodes) {
* @param curPmeDuration Execution duration for current partition map exchange.
*/
public void currentPmeDuration(long curPmeDuration) {
this.currentPmeDuration = curPmeDuration;
this.curPmeDuration = curPmeDuration;
}

/**
Expand Down Expand Up @@ -1308,36 +1308,36 @@ private static int cpuCnt(Map<String, Collection<ClusterNode>> neighborhood) {
* @param neighborhood Cluster neighborhood.
* @return CPU load.
*/
private static int cpus(Map<String, Collection<ClusterNode>> neighborhood) {
int cpus = 0;
private static double currentCpuLoad(Map<String, Collection<ClusterNode>> neighborhood) {
double curCpuLoad = 0.0;

for (Collection<ClusterNode> nodes : neighborhood.values()) {
ClusterNode first = F.first(nodes);

// Projection can be empty if all nodes in it failed.
if (first != null)
cpus += first.metrics().getCurrentCpuLoad();
curCpuLoad += first.metrics().getCurrentCpuLoad();
}

return cpus;
return curCpuLoad;
}

/**
* @param neighborhood Cluster neighborhood.
* @return GC CPU load.
*/
private static int gcCpus(Map<String, Collection<ClusterNode>> neighborhood) {
int cpus = 0;
private static double currentGcCpuLoad(Map<String, Collection<ClusterNode>> neighborhood) {
double curGcCpuLoad = 0;

for (Collection<ClusterNode> nodes : neighborhood.values()) {
ClusterNode first = F.first(nodes);

// Projection can be empty if all nodes in it failed.
if (first != null)
cpus += first.metrics().getCurrentGcCpuLoad();
curGcCpuLoad += first.metrics().getCurrentGcCpuLoad();
}

return cpus;
return curGcCpuLoad;
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.NodeValidationFailedEvent;
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.internal.ClusterMetricsSnapshot;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
Expand Down Expand Up @@ -7580,7 +7581,7 @@ private void processClientMetricsUpdateMessage(TcpDiscoveryClientMetricsUpdateMe
ClientMessageWorker wrk = clientMsgWorkers.get(msg.creatorNodeId());

if (wrk != null)
wrk.metrics(msg.metrics());
wrk.metrics(new ClusterMetricsSnapshot(msg.metricsMessage()));
else if (log.isDebugEnabled())
log.debug("Received client metrics update message from unknown client node: " + msg);
}
Expand Down
Loading