Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.ignite.internal.codegen.InetAddressMessageSerializer;
import org.apache.ignite.internal.codegen.InetSocketAddressMessageSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryAuthFailedMessageSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryCacheMetricsMessageSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryCheckFailedMessageSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryClientPingRequestSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryClientPingResponseSerializer;
Expand All @@ -29,6 +30,9 @@
import org.apache.ignite.internal.codegen.TcpDiscoveryHandshakeRequestSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryHandshakeResponseSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryLoopbackProblemMessageSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryNodeFullMetricsMessageSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryNodeMetricsMessageSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryNodesMetricsMapMessageSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryPingRequestSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryPingResponseSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryRingLatencyCheckMessageSerializer;
Expand All @@ -37,6 +41,7 @@
import org.apache.ignite.spi.discovery.tcp.messages.InetAddressMessage;
import org.apache.ignite.spi.discovery.tcp.messages.InetSocketAddressMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAuthFailedMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCacheMetricsMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCheckFailedMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingRequest;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingResponse;
Expand All @@ -46,6 +51,9 @@
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeRequest;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeResponse;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryLoopbackProblemMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFullMetricsMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeMetricsMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodesMetricsMapMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingRequest;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryRingLatencyCheckMessage;
Expand All @@ -54,6 +62,11 @@
public class DiscoveryMessageFactory implements MessageFactoryProvider {
/** {@inheritDoc} */
@Override public void registerAll(MessageFactory factory) {
factory.register((short)-105, TcpDiscoveryNodeFullMetricsMessage::new,
new TcpDiscoveryNodeFullMetricsMessageSerializer());
factory.register((short)-104, TcpDiscoveryNodesMetricsMapMessage::new, new TcpDiscoveryNodesMetricsMapMessageSerializer());
factory.register((short)-103, TcpDiscoveryCacheMetricsMessage::new, new TcpDiscoveryCacheMetricsMessageSerializer());
factory.register((short)-102, TcpDiscoveryNodeMetricsMessage::new, new TcpDiscoveryNodeMetricsMessageSerializer());
factory.register((short)-101, InetSocketAddressMessage::new, new InetSocketAddressMessageSerializer());
factory.register((short)-100, InetAddressMessage::new, new InetAddressMessageSerializer());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ class ClusterNodeMetrics {

/** */
public ClusterNodeMetrics(NodeFullMetricsMessage msg) {
nodeMetrics = new ClusterMetricsSnapshot(msg.nodeMetricsMsg());
nodeMetrics = new ClusterMetricsSnapshot(msg.nodeMetricsMessage());

cacheMetrics = new HashMap<>(msg.cachesMetrics().size(), 1.0f);
cacheMetrics = new HashMap<>(msg.cachesMetricsMessages().size(), 1.0f);

msg.cachesMetrics().entrySet().forEach(e -> cacheMetrics.put(e.getKey(), new CacheMetricsSnapshot(e.getValue())));
msg.cachesMetricsMessages().forEach((key, value) -> cacheMetrics.put(key, new CacheMetricsSnapshot(value)));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,17 @@
import org.apache.ignite.plugin.extensions.communication.Message;

/** Node compound metrics message. */
public final class NodeFullMetricsMessage implements Message {
public class NodeFullMetricsMessage implements Message {
/** */
public static final short TYPE_CODE = 138;

/** Node metrics wrapper message. */
@Order(0)
@Order(value = 0, method = "nodeMetricsMessage")
private NodeMetricsMessage nodeMetricsMsg;

/** Cache metrics wrapper message. */
@Order(1)
private Map<Integer, CacheMetricsMessage> cachesMetrics;
@Order(value = 1, method = "cachesMetricsMessages")
private Map<Integer, CacheMetricsMessage> cachesMetricsMsgs;

/** Empty constructor for {@link GridIoMessageFactory}. */
public NodeFullMetricsMessage() {
Expand All @@ -46,30 +46,40 @@ public NodeFullMetricsMessage() {

/** */
public NodeFullMetricsMessage(ClusterMetrics nodeMetrics, Map<Integer, CacheMetrics> cacheMetrics) {
nodeMetricsMsg = new NodeMetricsMessage(nodeMetrics);
nodeMetricsMsg = createNodeMetricsMessage(nodeMetrics);

cachesMetrics = new HashMap<>(cacheMetrics.size(), 1.0f);
cachesMetricsMsgs = new HashMap<>(cacheMetrics.size(), 1.0f);

cacheMetrics.forEach((key, value) -> cachesMetrics.put(key, new CacheMetricsMessage(value)));
cacheMetrics.forEach((key, value) -> cachesMetricsMsgs.put(key, createCacheMetricsMessage(value)));
}

/** */
public Map<Integer, CacheMetricsMessage> cachesMetrics() {
return cachesMetrics;
protected NodeMetricsMessage createNodeMetricsMessage(ClusterMetrics nodeMetrics) {
return new NodeMetricsMessage(nodeMetrics);
}

/** */
public void cachesMetrics(Map<Integer, CacheMetricsMessage> cacheMetricsMsg) {
cachesMetrics = cacheMetricsMsg;
protected CacheMetricsMessage createCacheMetricsMessage(CacheMetrics cacheMetrics) {
return new CacheMetricsMessage(cacheMetrics);
}

/** */
public NodeMetricsMessage nodeMetricsMsg() {
public Map<Integer, CacheMetricsMessage> cachesMetricsMessages() {
return cachesMetricsMsgs;
}

/** */
public void cachesMetricsMessages(Map<Integer, CacheMetricsMessage> cacheMetricsMsg) {
cachesMetricsMsgs = cacheMetricsMsg;
}

/** */
public NodeMetricsMessage nodeMetricsMessage() {
return nodeMetricsMsg;
}

/** */
public void nodeMetricsMsg(NodeMetricsMessage nodeMetricsMsg) {
public void nodeMetricsMessage(NodeMetricsMessage nodeMetricsMsg) {
this.nodeMetricsMsg = nodeMetricsMsg;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2526,8 +2526,8 @@ private void processMetricsUpdateMessage(TcpDiscoveryMetricsUpdateMessage msg) {
log.debug("Received metrics response: " + msg);
}
else {
if (msg.hasMetrics())
processMsgCacheMetrics(msg, System.nanoTime());
if (!F.isEmpty(msg.serversFullMetricsMessages()))
processCacheMetrics(msg, System.nanoTime());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2208,8 +2208,7 @@ private boolean recordable(TcpDiscoveryAbstractMessage msg) {
* @param nodeId Node ID.
*/
private static void removeMetrics(TcpDiscoveryMetricsUpdateMessage msg, UUID nodeId) {
msg.removeMetrics(nodeId);
msg.removeCacheMetrics(nodeId);
msg.removeServerMetrics(nodeId);
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -6032,30 +6031,30 @@ private void processMetricsUpdateMessage(TcpDiscoveryMetricsUpdateMessage msg) {

long tsNanos = System.nanoTime();

if (spiStateCopy() == CONNECTED && msg.hasMetrics())
processMsgCacheMetrics(msg, tsNanos);
if (spiStateCopy() == CONNECTED && !F.isEmpty(msg.serversFullMetricsMessages()))
processCacheMetrics(msg, tsNanos);

if (sendMessageToRemotes(msg)) {
if (laps == 0 && spiStateCopy() == CONNECTED) {
// Message is on its first ring or just created on coordinator.
msg.setMetrics(locNodeId, spi.metricsProvider.metrics());
msg.setCacheMetrics(locNodeId, spi.metricsProvider.cacheMetrics());
msg.addServerMetrics(locNodeId, spi.metricsProvider.metrics());
msg.addServerCacheMetrics(locNodeId, spi.metricsProvider.cacheMetrics());

for (Map.Entry<UUID, ClientMessageWorker> e : clientMsgWorkers.entrySet()) {
UUID nodeId = e.getKey();
ClusterMetrics metrics = e.getValue().metrics();

if (metrics != null)
msg.setClientMetrics(locNodeId, nodeId, metrics);

msg.addClientNodeId(nodeId);
}
}
else {
// Message is on its second ring.
removeMetrics(msg, locNodeId);
msg.removeServerMetrics(locNodeId);

Collection<UUID> clientNodeIds = msg.clientNodeIds();
Collection<UUID> clientNodeIds = F.isEmpty(msg.connectedClientsMetricsMessages())
? Collections.emptySet()
: msg.connectedClientsMetricsMessages().keySet();

for (TcpDiscoveryNode clientNode : ring.clientNodes()) {
if (clientNode.visible()) {
Expand All @@ -6065,9 +6064,7 @@ private void processMetricsUpdateMessage(TcpDiscoveryMetricsUpdateMessage msg) {
if (!clientNode.clientAliveTimeSet())
clientNode.clientAliveTime(spi.clientFailureDetectionTimeout());

boolean aliveCheck = clientNode.isClientAlive();

if (!aliveCheck && isLocalNodeCoordinator()) {
if (!clientNode.isClientAlive() && isLocalNodeCoordinator()) {
boolean failedNode;

synchronized (mux) {
Expand Down Expand Up @@ -8416,7 +8413,8 @@ else if (laps == 1) {
private int passedLaps(TcpDiscoveryMetricsUpdateMessage msg) {
UUID locNodeId = getLocalNodeId();

boolean hasLocMetrics = hasMetrics(msg, locNodeId);
boolean hasLocMetrics = !F.isEmpty(msg.serversFullMetricsMessages())
&& msg.serversFullMetricsMessages().get(locNodeId) != null;

if (locNodeId.equals(msg.creatorNodeId()) && !hasLocMetrics && msg.senderNodeId() != null)
return 2;
Expand All @@ -8425,15 +8423,5 @@ else if (msg.senderNodeId() == null || !hasLocMetrics)
else
return 1;
}

/**
* @param msg Metrics update message to check.
* @param nodeId Node ID for which the check should be performed.
* @return {@code True} is the message contains metrics of the node with the provided ID.
* {@code False} otherwise.
*/
private boolean hasMetrics(TcpDiscoveryMetricsUpdateMessage msg, UUID nodeId) {
return msg.hasMetrics(nodeId) || msg.hasCacheMetrics(nodeId);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,14 @@
import org.apache.ignite.cache.CacheMetrics;
import org.apache.ignite.cluster.ClusterMetrics;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.ClusterMetricsSnapshot;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.cache.CacheMetricsSnapshot;
import org.apache.ignite.internal.processors.cluster.CacheMetricsMessage;
import org.apache.ignite.internal.processors.cluster.NodeMetricsMessage;
import org.apache.ignite.internal.processors.tracing.NoopTracing;
import org.apache.ignite.internal.processors.tracing.Tracing;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.U;
Expand All @@ -49,6 +53,8 @@
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMetricsUpdateMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFullMetricsMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodesMetricsMapMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryRingLatencyCheckMessage;
import org.jetbrains.annotations.Nullable;

Expand Down Expand Up @@ -415,27 +421,45 @@ protected void clearNodeSensitiveData(TcpDiscoveryNode node) {
}

/** */
public void processMsgCacheMetrics(TcpDiscoveryMetricsUpdateMessage msg, long tsNanos) {
for (Map.Entry<UUID, TcpDiscoveryMetricsUpdateMessage.MetricsSet> e : msg.metrics().entrySet()) {
UUID nodeId = e.getKey();
public void processCacheMetrics(TcpDiscoveryMetricsUpdateMessage msg, long tsNanos) {
for (Map.Entry<UUID, TcpDiscoveryNodeFullMetricsMessage> e : msg.serversFullMetricsMessages().entrySet()) {
UUID srvrId = e.getKey();
Map<Integer, CacheMetricsMessage> cacheMetricsMsgs = e.getValue().cachesMetricsMessages();
NodeMetricsMessage srvrMetricsMsg = e.getValue().nodeMetricsMessage();

TcpDiscoveryMetricsUpdateMessage.MetricsSet metricsSet = e.getValue();
assert srvrMetricsMsg != null;

Map<Integer, CacheMetrics> cacheMetrics = msg.hasCacheMetrics(nodeId) ?
msg.cacheMetrics().get(nodeId) : Collections.emptyMap();
Map<Integer, CacheMetrics> cacheMetrics;

if (endTimeMetricsSizeProcessWait <= U.currentTimeMillis()
&& cacheMetrics.size() >= METRICS_QNT_WARN) {
if (!F.isEmpty(cacheMetricsMsgs)) {
cacheMetrics = U.newHashMap(cacheMetricsMsgs.size());

cacheMetricsMsgs.forEach((cacheId, cacheMetricsMsg) ->
cacheMetrics.put(cacheId, new CacheMetricsSnapshot(cacheMetricsMsg)));
}
else
cacheMetrics = Collections.emptyMap();

if (endTimeMetricsSizeProcessWait <= U.currentTimeMillis() && cacheMetrics.size() >= METRICS_QNT_WARN) {
log.warning("The Discovery message has metrics for " + cacheMetrics.size() + " caches.\n" +
"To prevent Discovery blocking use -DIGNITE_DISCOVERY_DISABLE_CACHE_METRICS_UPDATE=true option.");

endTimeMetricsSizeProcessWait = U.currentTimeMillis() + LOG_WARN_MSG_TIMEOUT;
}

updateMetrics(nodeId, metricsSet.metrics(), cacheMetrics, tsNanos);
updateMetrics(srvrId, new ClusterMetricsSnapshot(srvrMetricsMsg), cacheMetrics, tsNanos);

TcpDiscoveryNodesMetricsMapMessage clientsMetricsMsg = F.isEmpty(msg.connectedClientsMetricsMessages())
? null
: msg.connectedClientsMetricsMessages().get(srvrId);

if (clientsMetricsMsg == null)
return;

assert clientsMetricsMsg.nodesMetricsMessages() != null;

for (T2<UUID, ClusterMetrics> t : metricsSet.clientMetrics())
updateMetrics(t.get1(), t.get2(), cacheMetrics, tsNanos);
clientsMetricsMsg.nodesMetricsMessages().forEach((clientId, clientNodeMetricsMsg) ->
updateMetrics(clientId, new ClusterMetricsSnapshot(clientNodeMetricsMsg), cacheMetrics, tsNanos));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,13 +170,12 @@ <T> T readMessage() throws IgniteCheckedException, IOException {
if (MESSAGE_SERIALIZATION != serMode) {
detectSslAlert(serMode, in);

throw new IgniteCheckedException("Received unexpected byte while reading discovery message: " + serMode);
// IOException type is important for ServerImpl. It may search the cause (X.hasCause).
// The connection error processing behavior depends on it.
throw new IOException("Received unexpected byte while reading discovery message: " + serMode);
}

byte b0 = (byte)in.read();
byte b1 = (byte)in.read();

Message msg = spi.messageFactory().create(makeMessageType(b0, b1));
Message msg = spi.messageFactory().create(makeMessageType((byte)in.read(), (byte)in.read()));

msgReader.reset();
msgReader.setBuffer(msgBuf);
Expand All @@ -185,19 +184,47 @@ <T> T readMessage() throws IgniteCheckedException, IOException {

boolean finished;

do {
// Should be cleared before first operation.
msgBuf.clear();
msgBuf.clear();

int read = in.read(msgBuf.array(), 0, msgBuf.limit());
do {
int read = in.read(msgBuf.array(), msgBuf.position(), msgBuf.remaining());

if (read == -1)
throw new EOFException("Connection closed before message was fully read.");

msgBuf.limit(read);
if (msgBuf.position() > 0) {
msgBuf.limit(msgBuf.position() + read);

// We've stored an unprocessed tail before.
msgBuf.rewind();
}
else
msgBuf.limit(read);

finished = msgSer.readFrom(msg, msgReader);
} while (!finished);

// We rely on the fact that Discovery only sends next message upon receiving a receipt for the previous one.
// This behaviour guarantees that we never read a next message from the buffer right after the end of
// the previous message.
assert msgBuf.remaining() == 0 || !finished : "Some data was read from the socket but left unprocessed.";

if (finished)
break;

// We must keep the uprocessed bytes read from the socket. It won't return them again.
byte[] unprocessedTail = null;

if (msgBuf.remaining() > 0) {
unprocessedTail = new byte[msgBuf.remaining()];
msgBuf.get(unprocessedTail, 0, msgBuf.remaining());
}

msgBuf.clear();

if (unprocessedTail != null)
msgBuf.put(unprocessedTail);
}
while (true);

return (T)msg;
}
Expand Down
Loading