diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java index 1c01592fd0dd5..3a2844af1ac57 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java @@ -29,6 +29,7 @@ import org.apache.ignite.internal.codegen.TcpDiscoveryHandshakeRequestSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryHandshakeResponseSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryLoopbackProblemMessageSerializer; +import org.apache.ignite.internal.codegen.TcpDiscoveryNodeLeftMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryPingRequestSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryPingResponseSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryRingLatencyCheckMessageSerializer; @@ -46,6 +47,7 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeRequest; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeResponse; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryLoopbackProblemMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeLeftMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingRequest; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryRingLatencyCheckMessage; @@ -70,5 +72,6 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider { factory.register((short)10, TcpDiscoveryHandshakeResponse::new, new TcpDiscoveryHandshakeResponseSerializer()); factory.register((short)11, TcpDiscoveryAuthFailedMessage::new, new TcpDiscoveryAuthFailedMessageSerializer()); factory.register((short)12, TcpDiscoveryDuplicateIdMessage::new, new TcpDiscoveryDuplicateIdMessageSerializer()); + factory.register((short)13, TcpDiscoveryNodeLeftMessage::new, new TcpDiscoveryNodeLeftMessageSerializer()); } } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java index 5c1946f0eac34..edcab473b4c91 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java @@ -170,13 +170,12 @@ T readMessage() throws IgniteCheckedException, IOException { if (MESSAGE_SERIALIZATION != serMode) { detectSslAlert(serMode, in); - throw new IgniteCheckedException("Received unexpected byte while reading discovery message: " + serMode); + // IOException type is important for ServerImpl. It may search the cause (X.hasCause). + // The connection error processing behavior depends on it. + throw new IOException("Received unexpected byte while reading discovery message: " + serMode); } - byte b0 = (byte)in.read(); - byte b1 = (byte)in.read(); - - Message msg = spi.messageFactory().create(makeMessageType(b0, b1)); + Message msg = spi.messageFactory().create(makeMessageType((byte)in.read(), (byte)in.read())); msgReader.reset(); msgReader.setBuffer(msgBuf); @@ -185,19 +184,47 @@ T readMessage() throws IgniteCheckedException, IOException { boolean finished; - do { - // Should be cleared before first operation. - msgBuf.clear(); + msgBuf.clear(); - int read = in.read(msgBuf.array(), 0, msgBuf.limit()); + do { + int read = in.read(msgBuf.array(), msgBuf.position(), msgBuf.remaining()); if (read == -1) throw new EOFException("Connection closed before message was fully read."); - msgBuf.limit(read); + if (msgBuf.position() > 0) { + msgBuf.limit(msgBuf.position() + read); + + // We've stored an unprocessed tail before. + msgBuf.rewind(); + } + else + msgBuf.limit(read); finished = msgSer.readFrom(msg, msgReader); - } while (!finished); + + // We rely on the fact that Discovery only sends next message upon receiving a receipt for the previous one. + // This behaviour guarantees that we never read a next message from the buffer right after the end of + // the previous message. + assert msgBuf.remaining() == 0 || !finished : "Some data was read from the socket but left unprocessed."; + + if (finished) + break; + + // We must keep the uprocessed bytes read from the socket. It won't return them again. + byte[] unprocessedTail = null; + + if (msgBuf.remaining() > 0) { + unprocessedTail = new byte[msgBuf.remaining()]; + msgBuf.get(unprocessedTail, 0, msgBuf.remaining()); + } + + msgBuf.clear(); + + if (unprocessedTail != null) + msgBuf.put(unprocessedTail); + } + while (true); return (T)msg; } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractTraceableMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractTraceableMessage.java index 433566d7e3021..c76fc8694d9b9 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractTraceableMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractTraceableMessage.java @@ -17,10 +17,12 @@ package org.apache.ignite.spi.discovery.tcp.messages; -import java.io.Externalizable; import java.util.UUID; +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; import org.apache.ignite.internal.processors.tracing.messages.SpanContainer; import org.apache.ignite.internal.processors.tracing.messages.TraceableMessage; +import org.jetbrains.annotations.Nullable; /** * Abstract traceable message for TCP discovery. @@ -29,8 +31,12 @@ public abstract class TcpDiscoveryAbstractTraceableMessage extends TcpDiscoveryA /** Container. */ private SpanContainer spanContainer = new SpanContainer(); + /** Serialization holder of {@link #spanContainer}'s bytes. */ + @Order(value = 5, method = "spanBytes") + private @Nullable byte[] spanBytesHolder; + /** - * Default no-arg constructor for {@link Externalizable} interface. + * Default constructor for {@link DiscoveryMessageFactory}. */ protected TcpDiscoveryAbstractTraceableMessage() { // No-op. @@ -67,6 +73,21 @@ public Object readResolve() { return this; } + /** @return {@link #spanContainer}'s bytes. */ + public @Nullable byte[] spanBytes() { + return spanContainer == null ? null : spanContainer.serializedSpanBytes(); + } + + /** @param spanBytes {@link #spanContainer}'s bytes. */ + public void spanBytes(@Nullable byte[] spanBytes) { + if (spanBytes == null) + return; + + readResolve(); + + spanContainer.serializedSpanBytes(spanBytes); + } + /** {@inheritDoc} */ @Override public SpanContainer spanContainer() { return spanContainer; diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java index 5d6df69b715b7..b6ec6ff137d01 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java @@ -18,7 +18,9 @@ package org.apache.ignite.spi.discovery.tcp.messages; import java.util.UUID; +import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; /** * Sent by node that is stopping to coordinator across the ring, @@ -26,10 +28,15 @@ */ @TcpDiscoveryEnsureDelivery @TcpDiscoveryRedirectToClient -public class TcpDiscoveryNodeLeftMessage extends TcpDiscoveryAbstractTraceableMessage { +public class TcpDiscoveryNodeLeftMessage extends TcpDiscoveryAbstractTraceableMessage implements Message { /** */ private static final long serialVersionUID = 0L; + /** Constructor for {@link DiscoveryMessageFactory}. */ + public TcpDiscoveryNodeLeftMessage() { + // No-op. + } + /** * Constructor. * @@ -39,6 +46,11 @@ public TcpDiscoveryNodeLeftMessage(UUID creatorNodeId) { super(creatorNodeId); } + /** {@inheritDoc} */ + @Override public short directType() { + return 13; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(TcpDiscoveryNodeLeftMessage.class, this, "super", super.toString());