Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1830,6 +1830,15 @@ public final class IgniteSystemProperties extends IgniteCommonsSystemProperties
"when value IgniteConfiguration#getLocalHost is ip, for backward compatibility")
public static final String IGNITE_TCP_COMM_SET_ATTR_HOST_NAMES = "IGNITE_TCP_COMM_SET_ATTR_HOST_NAMES";

/**
* When set to positive number warning will be produced when outgoing message queue size of TCP communication SPI
* exeeds provided value.
* Default is {@code 0} (do not print warning).
*/
@SystemProperty(value = "When set to positive number warning will be produced when outgoing message queue size of " +
"TCP communication SPI exeeds provided value. Default is 0 (do not print warning).", type = Integer.class)
public static final String IGNITE_TCP_COMM_MSG_QUEUE_WARN_SIZE = "IGNITE_TCP_COMM_MSG_QUEUE_WARN_SIZE";

/**
* When above zero, prints tx key collisions once per interval.
* Each transaction besides OPTIMISTIC SERIALIZABLE capture locks on all enlisted keys, for some reasons
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,12 @@ public class GridNioServer<T> {
public static final String OUTBOUND_MESSAGES_QUEUE_SIZE_METRIC_DESC
= "Total number of messages waiting to be sent over all connections";

/** */
public static final String MAX_MESSAGES_QUEUE_SIZE_METRIC_NAME = "maxOutboundMessagesQueueSize";

/** */
public static final String MAX_MESSAGES_QUEUE_SIZE_METRIC_DESC = "Maximum number of messages waiting to be sent";

/** */
public static final String RECEIVED_BYTES_METRIC_NAME = "receivedBytes";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.metric.MetricRegistryImpl;
import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric;
import org.apache.ignite.internal.processors.metric.impl.MaxValueMetric;
import org.apache.ignite.internal.processors.tracing.MTC;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.internal.LT;
Expand All @@ -39,6 +40,8 @@
import org.jetbrains.annotations.Nullable;

import static org.apache.ignite.internal.processors.tracing.messages.TraceableMessagesTable.traceName;
import static org.apache.ignite.internal.util.nio.GridNioServer.MAX_MESSAGES_QUEUE_SIZE_METRIC_DESC;
import static org.apache.ignite.internal.util.nio.GridNioServer.MAX_MESSAGES_QUEUE_SIZE_METRIC_NAME;
import static org.apache.ignite.internal.util.nio.GridNioServer.OUTBOUND_MESSAGES_QUEUE_SIZE_METRIC_DESC;
import static org.apache.ignite.internal.util.nio.GridNioServer.OUTBOUND_MESSAGES_QUEUE_SIZE_METRIC_NAME;

Expand Down Expand Up @@ -92,6 +95,9 @@ public class GridSelectorNioSessionImpl extends GridNioSessionImpl implements Gr
/** Outbound messages queue size metric. */
@Nullable private final LongAdderMetric outboundMessagesQueueSizeMetric;

/** Maximum outbound messages queue size metric. */
@Nullable private final MaxValueMetric maxMessagesQueueSizeMetric;

/**
* Creates session instance.
*
Expand Down Expand Up @@ -149,6 +155,13 @@ public class GridSelectorNioSessionImpl extends GridNioSessionImpl implements Gr
OUTBOUND_MESSAGES_QUEUE_SIZE_METRIC_NAME,
OUTBOUND_MESSAGES_QUEUE_SIZE_METRIC_DESC
);

maxMessagesQueueSizeMetric = mreg == null ? null : mreg.maxValueMetric(
MAX_MESSAGES_QUEUE_SIZE_METRIC_NAME,
MAX_MESSAGES_QUEUE_SIZE_METRIC_DESC,
60_000,
5
);
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -317,6 +330,14 @@ int offerSystemFuture(SessionWriteRequest writeFut) {
if (outboundMessagesQueueSizeMetric != null)
outboundMessagesQueueSizeMetric.increment();

if (maxMessagesQueueSizeMetric != null) {
int queueSize = queue.sizex();

maxMessagesQueueSizeMetric.update(queueSize);

return queueSize;
}

return queue.sizex();
}

Expand Down Expand Up @@ -347,6 +368,14 @@ int offerFuture(SessionWriteRequest writeFut) {
if (outboundMessagesQueueSizeMetric != null)
outboundMessagesQueueSizeMetric.increment();

if (maxMessagesQueueSizeMetric != null) {
int queueSize = queue.sizex();

maxMessagesQueueSizeMetric.update(queueSize);

return queueSize;
}

return queue.sizex();
}

Expand All @@ -362,6 +391,9 @@ void resend(Collection<SessionWriteRequest> futs) {

if (outboundMessagesQueueSizeMetric != null)
outboundMessagesQueueSizeMetric.add(futs.size());

if (maxMessagesQueueSizeMetric != null)
maxMessagesQueueSizeMetric.update(futs.size());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.net.ssl.SSLEngine;
Expand Down Expand Up @@ -133,6 +134,9 @@ public class GridNioServerWrapper {
/** Default delay between reconnects attempts in case of temporary network issues. */
private static final int DFLT_RECONNECT_DELAY = 50;

/** Minimum frequency (in milliseconds) of high message queue size warning. */
private static final long MIN_MSG_QUEUE_SIZE_WARN_FREQUENCY = 30_000L;

/** Channel meta used for establishing channel connections. */
static final int CHANNEL_FUT_META = GridNioSessionMetaKey.nextUniqueKey();

Expand Down Expand Up @@ -205,9 +209,13 @@ public class GridNioServerWrapper {
private volatile ThrowableSupplier<SocketChannel, IOException> socketChannelFactory = SocketChannel::open;

/** Enable forcible node kill. */
private boolean forcibleNodeKillEnabled = IgniteSystemProperties
private final boolean forcibleNodeKillEnabled = IgniteSystemProperties
.getBoolean(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL);

/** Message queue size to print warning. */
private final int msgQueueWarningSize = IgniteSystemProperties.getInteger(
IgniteSystemProperties.IGNITE_TCP_COMM_MSG_QUEUE_WARN_SIZE, 0);

/** NIO server. */
private GridNioServer<Message> nioSrv;

Expand All @@ -223,6 +231,9 @@ public class GridNioServerWrapper {
/** Executor for establishing a connection to a node. */
private final TcpHandshakeExecutor tcpHandshakeExecutor;

/** Timestamp of the last high message queue size warning. */
private final AtomicLong lastMsqQueueSizeWarningTs = new AtomicLong();

/**
* @param log Logger.
* @param cfg Config.
Expand Down Expand Up @@ -892,7 +903,9 @@ private MessageFactory get() {
boolean clientMode = Boolean.TRUE.equals(igniteCfg.isClientMode());

IgniteBiInClosure<GridNioSession, Integer> queueSizeMonitor =
!clientMode && cfg.slowClientQueueLimit() > 0 ? this::checkClientQueueSize : null;
!clientMode && (cfg.slowClientQueueLimit() > 0 || msgQueueWarningSize > 0)
? msgQueueWarningSize > 0 ? this::checkNodeQueueSize : this::checkClientQueueSize
: null;

List<GridNioFilter> filters = new ArrayList<>();

Expand Down Expand Up @@ -1258,6 +1271,31 @@ private void checkClientQueueSize(GridNioSession ses, int msgQueueSize) {
}
}

/**
* Checks node message queue size and produce warning if message queue size exceeds the configured threshold.
*
* @param ses Node communication session.
* @param msgQueueSize Message queue size.
*/
private void checkNodeQueueSize(GridNioSession ses, int msgQueueSize) {
if (msgQueueWarningSize > 0 && msgQueueSize > msgQueueWarningSize) {
long lastWarnTs = lastMsqQueueSizeWarningTs.get();

if (U.currentTimeMillis() > lastWarnTs + MIN_MSG_QUEUE_SIZE_WARN_FREQUENCY) {
if (lastMsqQueueSizeWarningTs.compareAndSet(lastWarnTs, U.currentTimeMillis())) {
ConnectionKey id = ses.meta(CONN_IDX_META);
if (id != null) {
log.warning("Outbound message queue size for node exceeded configured " +
"messageQueueWarningSize value, it may be caused by node failure or a network problems " +
"[node=" + id.nodeId() + ", msqQueueSize=" + msgQueueSize + ']');
}
}
}
}

checkClientQueueSize(ses, msgQueueSize);
}

/**
* @param commWorker New recovery and idle clients handler.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@
import org.apache.ignite.internal.managers.discovery.DiscoveryServerOnlyCustomMessage;
import org.apache.ignite.internal.processors.configuration.distributed.DistributedBooleanProperty;
import org.apache.ignite.internal.processors.failure.FailureProcessor;
import org.apache.ignite.internal.processors.metric.MetricRegistryImpl;
import org.apache.ignite.internal.processors.metric.impl.MaxValueMetric;
import org.apache.ignite.internal.processors.security.SecurityContext;
import org.apache.ignite.internal.processors.security.SecurityUtils;
import org.apache.ignite.internal.processors.tracing.Span;
Expand Down Expand Up @@ -181,6 +183,7 @@
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_NODE_CERTIFICATES;
import static org.apache.ignite.internal.cluster.DistributedConfigurationUtils.CONN_DISABLED_BY_ADMIN_ERR_MSG;
import static org.apache.ignite.internal.cluster.DistributedConfigurationUtils.newConnectionEnabledProperty;
import static org.apache.ignite.internal.managers.discovery.GridDiscoveryManager.DISCO_METRICS;
import static org.apache.ignite.internal.processors.security.SecurityUtils.authenticateLocalNode;
import static org.apache.ignite.internal.processors.security.SecurityUtils.withSecurityContext;
import static org.apache.ignite.internal.util.lang.ClusterNodeFunc.node2id;
Expand Down Expand Up @@ -256,6 +259,9 @@ class ServerImpl extends TcpDiscoveryImpl {
@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
private StatisticsPrinter statsPrinter;

/** Metric for max message queue size. */
private MaxValueMetric maxMsgQueueSizeMetric;

/** Failed nodes (but still in topology). */
private final Map<TcpDiscoveryNode, UUID> failedNodes = new HashMap<>();

Expand Down Expand Up @@ -491,6 +497,16 @@ class ServerImpl extends TcpDiscoveryImpl {
/** {@inheritDoc} */
@Override public void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException {
spiCtx.registerPort(tcpSrvr.port, TCP);

MetricRegistryImpl discoReg = (MetricRegistryImpl)spiCtx.getOrCreateMetricRegistry(DISCO_METRICS);

maxMsgQueueSizeMetric = discoReg.maxValueMetric("MaxMsgQueueSize",
"Max message queue size", 60_000L, 5);

discoReg.register("Next", () -> {
TcpDiscoveryNode next = msgWorker != null ? msgWorker.next : null;
return next != null ? next.id() : null;
}, UUID.class, "Next in the ring node ID");
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -3115,6 +3131,9 @@ private void addToQueue(TcpDiscoveryAbstractMessage msg, boolean addFirst) {
if (log.isDebugEnabled())
log.debug("Message has been added to a worker's queue: " + msg);
}

if (maxMsgQueueSizeMetric != null)
maxMsgQueueSizeMetric.update(queue.size());
}

/** */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,10 @@
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.managers.communication.GridIoManager;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.nio.GridNioServer;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.plugin.segmentation.SegmentationPolicy;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.communication.tcp.internal.GridNioServerWrapper;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
Expand Down Expand Up @@ -154,13 +150,7 @@ public void testSlowClient() throws Exception {
for (int i = 0; i < 100; i++)
cache0.put(0, i);

GridIoManager ioMgr = slowClient.context().io();

TcpCommunicationSpi commSpi = (TcpCommunicationSpi)((Object[])U.field(ioMgr, "spis"))[0];

GridNioServer nioSrvr = ((GridNioServerWrapper)GridTestUtils.getFieldValue(commSpi, "nioSrvWrapper")).nio();

GridTestUtils.setFieldValue(nioSrvr, "skipRead", true);
GridTestUtils.skipCommNioServerRead(slowClient, true);

// Initiate messages for client.
for (int i = 0; i < 100; i++)
Expand Down
Loading
Loading