Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.ignite.internal.codegen.TcpDiscoveryHandshakeRequestSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryHandshakeResponseSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryLoopbackProblemMessageSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryNodeLeftMessageSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryPingRequestSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryPingResponseSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryRingLatencyCheckMessageSerializer;
Expand All @@ -46,6 +47,7 @@
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeRequest;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeResponse;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryLoopbackProblemMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeLeftMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingRequest;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryRingLatencyCheckMessage;
Expand All @@ -70,5 +72,6 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider {
factory.register((short)10, TcpDiscoveryHandshakeResponse::new, new TcpDiscoveryHandshakeResponseSerializer());
factory.register((short)11, TcpDiscoveryAuthFailedMessage::new, new TcpDiscoveryAuthFailedMessageSerializer());
factory.register((short)12, TcpDiscoveryDuplicateIdMessage::new, new TcpDiscoveryDuplicateIdMessageSerializer());
factory.register((short)13, TcpDiscoveryNodeLeftMessage::new, new TcpDiscoveryNodeLeftMessageSerializer());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -170,13 +170,12 @@ <T> T readMessage() throws IgniteCheckedException, IOException {
if (MESSAGE_SERIALIZATION != serMode) {
detectSslAlert(serMode, in);

throw new IgniteCheckedException("Received unexpected byte while reading discovery message: " + serMode);
// IOException type is important for ServerImpl. It may search the cause (X.hasCause).
// The connection error processing behavior depends on it.
throw new IOException("Received unexpected byte while reading discovery message: " + serMode);
}

byte b0 = (byte)in.read();
byte b1 = (byte)in.read();

Message msg = spi.messageFactory().create(makeMessageType(b0, b1));
Message msg = spi.messageFactory().create(makeMessageType((byte)in.read(), (byte)in.read()));

msgReader.reset();
msgReader.setBuffer(msgBuf);
Expand All @@ -185,19 +184,47 @@ <T> T readMessage() throws IgniteCheckedException, IOException {

boolean finished;

do {
// Should be cleared before first operation.
msgBuf.clear();
msgBuf.clear();

int read = in.read(msgBuf.array(), 0, msgBuf.limit());
do {
int read = in.read(msgBuf.array(), msgBuf.position(), msgBuf.remaining());

if (read == -1)
throw new EOFException("Connection closed before message was fully read.");

msgBuf.limit(read);
if (msgBuf.position() > 0) {
msgBuf.limit(msgBuf.position() + read);

// We've stored an unprocessed tail before.
msgBuf.rewind();
}
else
msgBuf.limit(read);

finished = msgSer.readFrom(msg, msgReader);
} while (!finished);

// We rely on the fact that Discovery only sends next message upon receiving a receipt for the previous one.
// This behaviour guarantees that we never read a next message from the buffer right after the end of
// the previous message.
assert msgBuf.remaining() == 0 || !finished : "Some data was read from the socket but left unprocessed.";

if (finished)
break;

// We must keep the uprocessed bytes read from the socket. It won't return them again.
byte[] unprocessedTail = null;

if (msgBuf.remaining() > 0) {
unprocessedTail = new byte[msgBuf.remaining()];
msgBuf.get(unprocessedTail, 0, msgBuf.remaining());
}

msgBuf.clear();

if (unprocessedTail != null)
msgBuf.put(unprocessedTail);
}
while (true);

return (T)msg;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@

package org.apache.ignite.spi.discovery.tcp.messages;

import java.io.Externalizable;
import java.util.UUID;
import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory;
import org.apache.ignite.internal.processors.tracing.messages.SpanContainer;
import org.apache.ignite.internal.processors.tracing.messages.TraceableMessage;
import org.jetbrains.annotations.Nullable;

/**
* Abstract traceable message for TCP discovery.
Expand All @@ -29,8 +31,12 @@ public abstract class TcpDiscoveryAbstractTraceableMessage extends TcpDiscoveryA
/** Container. */
private SpanContainer spanContainer = new SpanContainer();

/** Serialization holder of {@link #spanContainer}'s bytes. */
@Order(value = 5, method = "spanBytes")
private @Nullable byte[] spanBytesHolder;

/**
* Default no-arg constructor for {@link Externalizable} interface.
* Default constructor for {@link DiscoveryMessageFactory}.
*/
protected TcpDiscoveryAbstractTraceableMessage() {
// No-op.
Expand Down Expand Up @@ -67,6 +73,21 @@ public Object readResolve() {
return this;
}

/** @return {@link #spanContainer}'s bytes. */
public @Nullable byte[] spanBytes() {
return spanContainer == null ? null : spanContainer.serializedSpanBytes();
}

/** @param spanBytes {@link #spanContainer}'s bytes. */
public void spanBytes(@Nullable byte[] spanBytes) {
if (spanBytes == null)
return;

readResolve();

spanContainer.serializedSpanBytes(spanBytes);
}

/** {@inheritDoc} */
@Override public SpanContainer spanContainer() {
return spanContainer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,25 @@
package org.apache.ignite.spi.discovery.tcp.messages;

import java.util.UUID;
import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.plugin.extensions.communication.Message;

/**
* Sent by node that is stopping to coordinator across the ring,
* then sent by coordinator across the ring.
*/
@TcpDiscoveryEnsureDelivery
@TcpDiscoveryRedirectToClient
public class TcpDiscoveryNodeLeftMessage extends TcpDiscoveryAbstractTraceableMessage {
public class TcpDiscoveryNodeLeftMessage extends TcpDiscoveryAbstractTraceableMessage implements Message {
/** */
private static final long serialVersionUID = 0L;

/** Constructor for {@link DiscoveryMessageFactory}. */
public TcpDiscoveryNodeLeftMessage() {
// No-op.
}

/**
* Constructor.
*
Expand All @@ -39,6 +46,11 @@ public TcpDiscoveryNodeLeftMessage(UUID creatorNodeId) {
super(creatorNodeId);
}

/** {@inheritDoc} */
@Override public short directType() {
return 13;
}

/** {@inheritDoc} */
@Override public String toString() {
return S.toString(TcpDiscoveryNodeLeftMessage.class, this, "super", super.toString());
Expand Down