Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.IMutation;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.ReadCommand.PotentialTxnConflicts;
import org.apache.cassandra.db.ReadExecutionController;
import org.apache.cassandra.db.RegularAndStaticColumns;
Expand Down Expand Up @@ -121,6 +120,7 @@
import org.apache.cassandra.service.accord.txn.TxnReferenceOperation;
import org.apache.cassandra.service.accord.txn.TxnReferenceOperations;
import org.apache.cassandra.service.accord.txn.TxnWrite;
import org.apache.cassandra.service.replication.migration.MigrationRouter;
import org.apache.cassandra.service.disk.usage.DiskUsageBroadcaster;
import org.apache.cassandra.service.paxos.Ballot;
import org.apache.cassandra.service.paxos.BallotGenerator;
Expand Down Expand Up @@ -817,27 +817,29 @@ public ResultMessage executeInternalWithoutCondition(QueryState queryState, Quer
long timestamp = options.getTimestamp(queryState);
long nowInSeconds = options.getNowInSeconds(queryState);
List<? extends IMutation> mutations = getMutations(queryState.getClientState(), options, true, timestamp, nowInSeconds, requestTime);
boolean isTracked = !mutations.isEmpty() && Schema.instance.getKeyspaceMetadata(mutations.get(0).getKeyspaceName()).params.replicationType.isTracked();
if (isTracked)

MigrationRouter.RoutedMutations routed = MigrationRouter.routeMutations(mutations);

if (!routed.trackedMutations.isEmpty())
{
if (mutations.stream().anyMatch(m -> m instanceof CounterMutation))
if (routed.trackedMutations.stream().anyMatch(m -> m instanceof CounterMutation))
throw new InvalidRequestException("Mutation tracking is currently unsupported with counters");
if (mutations.size() > 1)
throw new InvalidRequestException("Mutation tracking is currently unsupported with unlogged batches");

Mutation mutation = (Mutation) mutations.get(0);
}

for (IMutation mutation : routed.trackedMutations)
{
String keyspaceName = mutation.getKeyspaceName();
Token token = mutation.key().getToken();
MutationId id = MutationTrackingService.instance.nextMutationId(keyspaceName, token);
mutation = mutation.withMutationId(id);
mutation.apply();
}
else

for (IMutation mutation : routed.untrackedMutations)
{
for (IMutation mutation : mutations)
mutation.apply();
mutation.apply();
}

return null;
}

Expand Down
41 changes: 41 additions & 0 deletions src/java/org/apache/cassandra/db/AbstractMutationVerbHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.replication.migration.MigrationRouter;
import org.apache.cassandra.service.replication.migration.MigrationRouter.MutationRouting;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.tcm.ClusterMetadataService;
import org.apache.cassandra.tcm.Epoch;
Expand All @@ -61,6 +63,7 @@ protected void processMessage(Message<T> message, InetAddressAndPort respondTo)
ClusterMetadata metadata = ClusterMetadata.current();
metadata = checkTokenOwnership(metadata, message, respondTo);
metadata = checkSchemaVersion(metadata, message, respondTo);
metadata = checkReplicationMigration(metadata, message, respondTo);
}

try
Expand Down Expand Up @@ -193,6 +196,44 @@ else if (message.epoch().isBefore(metadata.schema.lastModified()))
return metadata;
}

/**
* Confirm that the presence/absence of a mutation id matches our expectations for the given keyspace/table/token. If
* it doesn't, then we're not on the same epoch as the coordinator, or there's a bug.
*/
private ClusterMetadata checkReplicationMigration(ClusterMetadata metadata, Message<T> message, InetAddressAndPort respondTo)
{
IMutation mutation = message.payload;
MutationRouting expected = mutation.id().isNone() ? MutationRouting.UNTRACKED : MutationRouting.TRACKED;
if (expected == MigrationRouter.getMutationRouting(metadata, mutation))
return metadata;

if (message.epoch().isAfter(metadata.epoch))
{
// coordinator is ahead, fetch log and recheck
metadata = ClusterMetadataService.instance().fetchLogFromPeerOrCMS(metadata, respondTo, message.epoch());

// recheck, we may now be ahead of the coordinator
return checkReplicationMigration(metadata, message, respondTo);
}
else if (message.epoch().isBefore(metadata.epoch))
{
TCMMetrics.instance.coordinatorBehindReplication.mark();
throw new CoordinatorBehindException(String.format("Replication type / migration mismatch for keyspace: %s token %s, coordinator: %s is behind, our epoch = %s, their epoch = %s",
mutation.getKeyspaceName(),
mutation.key(),
respondTo,
metadata.epoch, message.epoch()));
}
else
{
// same epoch but different routing should not be possible
throw new IllegalStateException(String.format("Inconsistent mutation routing at epoch = %s. Keyspace: %s key: %s ",
metadata.epoch,
mutation.getKeyspaceName(),
mutation.key()));
}
}

private static VersionedEndpoints.ForToken writePlacements(ClusterMetadata metadata, String keyspace, DecoratedKey key)
{
return metadata.placements.get(metadata.schema.getKeyspace(keyspace).getMetadata().params.replication).writes.forToken(key.getToken());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.exceptions.RequestExecutionException;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.service.replication.migration.MigrationRouter;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.concurrent.OpOrder;

Expand All @@ -46,6 +47,7 @@ public WriteContext beginWrite(Mutation mutation, boolean makeDurable) throws Re
{
group = Keyspace.writeOrder.start();

MigrationRouter.validateUntrackedMutation(mutation);
// write the mutation to the commitlog and memtables
CommitLogPosition position = null;
if (makeDurable)
Expand Down
7 changes: 3 additions & 4 deletions src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -1159,10 +1159,9 @@ public CommitLogPosition call()
CommitLogPosition commitLogLowerBound = mainMemtable.getCommitLogLowerBound();
commitLogUpperBound = mainMemtable.getFinalCommitLogUpperBound();
TableMetadata metadata = metadata();
if (metadata.replicationType().isTracked())
MutationJournal.instance.notifyFlushed(metadata.id, commitLogLowerBound, commitLogUpperBound);
else
CommitLog.instance.discardCompletedSegments(metadata.id, commitLogLowerBound, commitLogUpperBound);

MutationJournal.instance.notifyFlushed(metadata.id, commitLogLowerBound, commitLogUpperBound);
CommitLog.instance.discardCompletedSegments(metadata.id, commitLogLowerBound, commitLogUpperBound);
}

metric.pendingFlushes.dec();
Expand Down
12 changes: 7 additions & 5 deletions src/java/org/apache/cassandra/db/Keyspace.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.consensus.migration.ConsensusMigrationMutationHelper;
import org.apache.cassandra.service.replication.migration.MigrationRouter;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.ByteBufferUtil;
Expand Down Expand Up @@ -403,9 +404,10 @@ public void initCf(TableMetadata metadata, boolean loadSSTables)

public Future<?> applyFuture(Mutation mutation, boolean writeCommitLog, boolean updateIndexes)
{
return getMetadata().useMutationTracking()
? applyInternalTracked(mutation, new AsyncPromise<>())
: applyInternal(mutation, writeCommitLog, updateIndexes, true, true, new AsyncPromise<>());
if (mutation.id().isNone())
return applyInternal(mutation, writeCommitLog, updateIndexes, true, true, new AsyncPromise<>());
else
return applyInternalTracked(mutation, new AsyncPromise<>());
}

public void apply(Mutation mutation, boolean writeCommitLog, boolean updateIndexes)
Expand Down Expand Up @@ -435,7 +437,7 @@ public void apply(final Mutation mutation,
boolean updateIndexes,
boolean isDroppable)
{
if (getMetadata().useMutationTracking())
if (MigrationRouter.isFullyTracked(mutation))
applyInternalTracked(mutation, null);
else
applyInternal(mutation, makeDurable, updateIndexes, isDroppable, false, null);
Expand Down Expand Up @@ -612,7 +614,7 @@ else if (isDeferrable)
*/
private Future<?> applyInternalTracked(Mutation mutation, Promise<?> future)
{
Preconditions.checkState(getMetadata().useMutationTracking() && !mutation.id().isNone());
Preconditions.checkState(MigrationRouter.isFullyTracked(mutation) && !mutation.id().isNone());
ClusterMetadata cm = ClusterMetadata.current();

if (TEST_FAIL_WRITES && getMetadata().name.equals(TEST_FAIL_WRITES_KS))
Expand Down
14 changes: 14 additions & 0 deletions src/java/org/apache/cassandra/db/Mutation.java
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,20 @@ public Pair<DecoratedKey, TableMetadata> deserializeKeyAndTableMetadata(DataInpu
return PartitionUpdate.serializer.deserializeMetadataAndKey(in, version, flag);
}

public TableId deserializeTableId(DataInputBuffer in, int version, DeserializationHelper.Flag flag) throws IOException
{
if (version >= VERSION_51)
in.skipBytes(1); // flags

if (version >= VERSION_52)
MutationId.serializer.skip(in, version);

int size = in.readUnsignedVInt32();
assert size > 0;

return PartitionUpdate.serializer.deserializeTableId(in, version, flag);
}

public Mutation deserialize(DataInputPlus in, int version) throws IOException
{
return deserialize(in, version, DeserializationHelper.Flag.FROM_REMOTE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -834,6 +834,11 @@ public Pair<DecoratedKey, TableMetadata> deserializeMetadataAndKey(DataInputBuff
return Pair.create(header.key, tableMetadata);
}

public TableId deserializeTableId(DataInputBuffer in, int version, DeserializationHelper.Flag flag) throws IOException
{
return TableId.deserialize(in);
}

public PartitionUpdate deserialize(PartitionKey key, TableMetadatas tables, DataInputPlus in, int version, DeserializationHelper.Flag flag) throws IOException
{
TableMetadata tableMetadata = tables.deserialize(in);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.cassandra.db.commitlog.CommitLogPosition;
import org.apache.cassandra.exceptions.RequestExecutionException;
import org.apache.cassandra.replication.MutationJournal;
import org.apache.cassandra.service.replication.migration.MigrationRouter;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.concurrent.OpOrder;

Expand All @@ -38,6 +39,8 @@ public WriteContext beginWrite(Mutation mutation, boolean makeDurable) throws Re
{
group = Keyspace.writeOrder.start();

MigrationRouter.validateTrackedMutation(mutation);

Tracing.trace("Appending to mutation journal");
CommitLogPosition pointer = MutationJournal.instance.write(mutation.id(), mutation);

Expand Down
13 changes: 12 additions & 1 deletion src/java/org/apache/cassandra/hints/Hint.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.service.replication.migration.MigrationRouter;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.utils.concurrent.Future;
import org.apache.cassandra.utils.concurrent.ImmediateFuture;
import org.apache.cassandra.utils.vint.VIntCoding;
Expand Down Expand Up @@ -99,13 +101,22 @@ Future<?> applyFuture()
{
if (isLive())
{
ClusterMetadata cm = ClusterMetadata.current();
// filter out partition update for tables that have been truncated since hint's creation
Mutation filtered = mutation;
for (TableId id : mutation.getTableIds())
{
if (MigrationRouter.shouldUseTrackedForWrites(cm, mutation.getKeyspaceName(), id, mutation.key().getToken()))
{
filtered = filtered.without(id);
continue;
}

if (creationTime <= SystemKeyspace.getTruncatedAt(id))
filtered = filtered.without(id);
}

if (!filtered.isEmpty())
if (filtered != null && !filtered.isEmpty())
return filtered.applyFuture();
}

Expand Down
10 changes: 8 additions & 2 deletions src/java/org/apache/cassandra/hints/HintsDispatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,9 @@ private <T> Action sendHints(Iterator<T> hints, @Nullable BitSet hintsFilter, Co
if (hintsFilter != null && !hintsFilter.get(hintIndex))
continue;

callbacks.add(sendFunction.apply(hint));
Callback callback = sendFunction.apply(hint);
if (callback != null)
callbacks.add(callback);
}
return Action.CONTINUE;
}
Expand Down Expand Up @@ -417,7 +419,11 @@ private SplitHint splitHintIntoAccordAndNormal(ClusterMetadata cm, Hint hint)
{
SplitMutation<Mutation> splitMutation = ConsensusMigrationMutationHelper.instance().splitMutation(hint.mutation, cm);
if (splitMutation.trackedMutation != null)
throw new IllegalStateException("Cannot generate hints for tracked mutations");
{
logger.debug("Discarding tracked component of hint");
if (splitMutation.accordMutation == null && splitMutation.untrackedMutation == null)
return new SplitHint(null, null);
}
if (splitMutation.accordMutation == null)
return new SplitHint(null, hint);
if (splitMutation.untrackedMutation == null)
Expand Down
6 changes: 5 additions & 1 deletion src/java/org/apache/cassandra/journal/Segments.java
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@ void select(long minTimestamp, long maxTimestamp, Collection<Segment<K, V>> into
}
}

/**
* Find index of first segment with timestamp >= given timestamp.
* Returns sorted.size() if timestamp greater than all segments.
*/
int findIdxFor(long timestamp)
{
List<Segment<K, V>> sorted = allSorted(true);
Expand All @@ -172,7 +176,7 @@ else if (res == 0)
else
high = mid - 1;
}
throw new IllegalStateException(String.format("Could not find a segment with timestamp %d among %s", timestamp, sorted));
return low;
}

boolean isSwitched(ActiveSegment<K, V> active)
Expand Down
2 changes: 2 additions & 0 deletions src/java/org/apache/cassandra/metrics/TCMMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public class TCMMetrics
public final Meter progressBarrierCLRelax;
public final Meter coordinatorBehindSchema;
public final Meter coordinatorBehindPlacements;
public final Meter coordinatorBehindReplication;
public final Gauge<Long> epochAwareDebounceTrackerSize;
public final Meter reconstructLogStateCall;

Expand Down Expand Up @@ -128,6 +129,7 @@ private TCMMetrics()

coordinatorBehindSchema = Metrics.meter(factory.createMetricName("CoordinatorBehindSchema"));
coordinatorBehindPlacements = Metrics.meter(factory.createMetricName("CoordinatorBehindPlacements"));
coordinatorBehindReplication = Metrics.meter(factory.createMetricName("CoordinatorBehindReplication"));
reconstructLogStateCall = Metrics.meter(factory.createMetricName("ReconstructLogStateCall"));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ private List<RepairSession> submitRepairSessions(TimeUUID parentSession,
options.repairAccord(),
executor,
validationScheduler,
coordinator.minEpoch,
cfnames);
if (session == null)
continue;
Expand Down
15 changes: 9 additions & 6 deletions src/java/org/apache/cassandra/repair/RepairCoordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.tcm.Epoch;
import org.apache.cassandra.tracing.TraceKeyspace;
import org.apache.cassandra.tracing.TraceState;
import org.apache.cassandra.tracing.Tracing;
Expand Down Expand Up @@ -108,25 +109,27 @@ public class RepairCoordinator implements Runnable, ProgressEventNotifier, Repai

private final List<ProgressListener> listeners = new ArrayList<>();
private final AtomicReference<Throwable> firstError = new AtomicReference<>(null);
public final Epoch minEpoch;
final SharedContext ctx;
final Scheduler validationScheduler;

private TraceState traceState;

public RepairCoordinator(StorageService storageService, int cmd, RepairOption options, String keyspace)
public RepairCoordinator(StorageService storageService, int cmd, RepairOption options, String keyspace, Epoch minEpoch)
{
this(SharedContext.Global.instance,
(ks, tables) -> storageService.getValidColumnFamilies(false, false, ks, tables),
storageService::getLocalReplicas,
cmd, options, keyspace);
(ks, tables) -> storageService.getValidColumnFamilies(false, false, ks, tables),
storageService::getLocalReplicas,
cmd, options, keyspace, minEpoch);
}

RepairCoordinator(SharedContext ctx,
BiFunction<String, String[], Iterable<ColumnFamilyStore>> validColumnFamilies,
Function<String, RangesAtEndpoint> getLocalReplicas,
int cmd, RepairOption options, String keyspace)
int cmd, RepairOption options, String keyspace, Epoch minEpoch)
{
this.ctx = ctx;
this.minEpoch = minEpoch;
this.validationScheduler = Scheduler.build(DatabaseDescriptor.getConcurrentMerkleTreeRequests());
this.state = new CoordinatorState(ctx, cmd, keyspace, options);
this.tag = "repair:" + cmd;
Expand Down Expand Up @@ -483,7 +486,7 @@ private Future<?> prepare(List<ColumnFamilyStore> columnFamilies, Set<InetAddres
state.phase.prepareStart();
Timer timer = Keyspace.open(state.keyspace).metric.repairPrepareTime;
long startNanos = ctx.clock().nanoTime();
return ctx.repair().prepareForRepair(state.id, ctx.broadcastAddressAndPort(), allNeighbors, state.options, force, columnFamilies)
return ctx.repair().prepareForRepair(state.id, ctx.broadcastAddressAndPort(), allNeighbors, state.options, force, columnFamilies, minEpoch)
.map(ignore -> {
timer.update(ctx.clock().nanoTime() - startNanos, TimeUnit.NANOSECONDS);
state.phase.prepareComplete();
Expand Down
Loading