Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -277,15 +277,17 @@ public Plan plan() {
List<ManifestFileMeta> manifests = manifestsResult.filteredManifests;

Iterator<ManifestEntry> iterator = readManifestEntries(manifests, false);
if (supportsLimitPushManifestEntries()) {
iterator = limitPushManifestEntries(iterator);
}

List<ManifestEntry> files = ListUtils.toList(iterator);
if (postFilterManifestEntriesEnabled()) {
files = postFilterManifestEntries(files);
}

if (supportsLimitPushManifestEntries()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to do performance test again.

iterator = limitPushManifestEntries(files.iterator());
files = ListUtils.toList(iterator);
}

List<ManifestEntry> result = files;

long scanDuration = (System.nanoTime() - started) / 1_000_000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.CoreOptions.ChangelogProducer;
import org.apache.paimon.CoreOptions.MergeEngine;
import org.apache.paimon.KeyValueFileStore;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.fileindex.FileIndexPredicate;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.FilteredManifestEntry;
Expand All @@ -34,15 +35,20 @@
import org.apache.paimon.stats.SimpleStatsEvolutions;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.ListUtils;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.SnapshotManager;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -55,6 +61,8 @@
/** {@link FileStoreScan} for {@link KeyValueFileStore}. */
public class KeyValueFileStoreScan extends AbstractFileStoreScan {

private static final Logger LOG = LoggerFactory.getLogger(KeyValueFileStoreScan.class);

private final SimpleStatsEvolutions fieldKeyStatsConverters;
private final SimpleStatsEvolutions fieldValueStatsConverters;
private final BucketSelectConverter bucketSelectConverter;
Expand Down Expand Up @@ -203,6 +211,122 @@ private boolean isValueFilterEnabled() {
}
}

/**
* Check if limit pushdown is supported for PK tables.
*
* <p>Not supported when merge engine is PARTIAL_UPDATE/AGGREGATE (need merge) or deletion
* vectors are enabled (can't count deleted rows). For DEDUPLICATE/FIRST_ROW, per-bucket checks
* (no overlapping, no delete rows) are done in applyLimitPushdownForBucket.
*/
@Override
public boolean supportsLimitPushManifestEntries() {
if (mergeEngine == PARTIAL_UPDATE || mergeEngine == AGGREGATE) {
return false;
}

return limit != null && limit > 0 && !deletionVectorsEnabled;
}

/**
* Apply limit pushdown by grouping files by bucket and accumulating row counts until limit is
* reached. For buckets that can't safely push down limit (overlapping files or delete rows),
* include all files.
*/
@Override
protected Iterator<ManifestEntry> limitPushManifestEntries(Iterator<ManifestEntry> entries) {
long startTime = System.nanoTime();
List<ManifestEntry> allEntries = ListUtils.toList(entries);
Map<Pair<BinaryRow, Integer>, List<ManifestEntry>> buckets = groupByBucket(allEntries);

List<ManifestEntry> result = new ArrayList<>();
long accumulatedRowCount = 0;

for (List<ManifestEntry> bucketEntries : buckets.values()) {
if (accumulatedRowCount >= limit) {
break;
}

long remainingLimit = limit - accumulatedRowCount;
List<ManifestEntry> processedBucket =
applyLimitPushdownForBucket(bucketEntries, remainingLimit);
if (processedBucket == null) {
result.addAll(bucketEntries);
} else {
result.addAll(processedBucket);
for (ManifestEntry entry : processedBucket) {
long fileRowCount = entry.file().rowCount();
accumulatedRowCount += fileRowCount;
}
}
}

long duration = (System.nanoTime() - startTime) / 1_000_000;
LOG.info(
"Limit pushdown for PK table completed in {} ms. Limit: {}, InputFiles: {}, OutputFiles: {}, "
+ "MergeEngine: {}, ScanMode: {}, DeletionVectorsEnabled: {}",
duration,
limit,
allEntries.size(),
result.size(),
mergeEngine,
scanMode,
deletionVectorsEnabled);
return result.iterator();
}

/**
* Apply limit pushdown for a single bucket. Returns files to include, or null if unsafe.
*
* <p>Returns null if files overlap (LSM level 0 or different levels) or have delete rows. For
* non-overlapping files with no delete rows, accumulates row counts until limit is reached.
*
* @param bucketEntries files in the same bucket
* @param limit the limit to apply
* @return files to include, or null if we can't safely push down limit
*/
@Nullable
private List<ManifestEntry> applyLimitPushdownForBucket(
List<ManifestEntry> bucketEntries, long limit) {
// Check if this bucket has overlapping files (LSM property)
boolean hasOverlapping = !noOverlapping(bucketEntries);

if (hasOverlapping) {
// For buckets with overlapping, we can't safely push down limit because files
// need to be merged and we can't accurately calculate the merged row count.
return null;
}

// For buckets without overlapping and with merge engines that don't require
// merge (DEDUPLICATE or FIRST_ROW), we can safely accumulate row count
// and stop when limit is reached, but only if files have no delete rows.
List<ManifestEntry> result = new ArrayList<>();
long accumulatedRowCount = 0;

for (ManifestEntry entry : bucketEntries) {
long fileRowCount = entry.file().rowCount();
// Check if file has delete rows - if so, we can't accurately calculate
// the merged row count, so we need to stop limit pushdown
boolean hasDeleteRows =
entry.file().deleteRowCount().map(count -> count > 0L).orElse(false);

if (hasDeleteRows) {
// If file has delete rows, we can't accurately calculate merged row count
// without reading the actual data. Can't safely push down limit.
return null;
}

// File has no delete rows, no overlapping, and merge engine doesn't require merge.
// Safe to count rows.
result.add(entry);
accumulatedRowCount += fileRowCount;
if (accumulatedRowCount >= limit) {
break;
}
}

return result;
}

@Override
protected boolean postFilterManifestEntriesEnabled() {
return valueFilter != null && scanMode == ScanMode.ALL;
Expand All @@ -214,15 +338,8 @@ protected List<ManifestEntry> postFilterManifestEntries(List<ManifestEntry> file
// Why do this: because in primary key table, we can't just filter the value
// by the stat in files (see `PrimaryKeyFileStoreTable.nonPartitionFilterConsumer`),
// but we can do this by filter the whole bucket files
return files.stream()
.collect(
Collectors.groupingBy(
// we use LinkedHashMap to avoid disorder
file -> Pair.of(file.partition(), file.bucket()),
LinkedHashMap::new,
Collectors.toList()))
.values()
.stream()
Map<Pair<BinaryRow, Integer>, List<ManifestEntry>> buckets = groupByBucket(files);
return buckets.values().stream()
.map(this::doFilterWholeBucketByStats)
.flatMap(Collection::stream)
.collect(Collectors.toList());
Expand Down Expand Up @@ -316,4 +433,19 @@ private static boolean noOverlapping(List<ManifestEntry> entries) {

return true;
}

/**
* Group manifest entries by (partition, bucket) while preserving order. This is a common
* operation used by both limitPushManifestEntries and postFilterManifestEntries.
*/
private Map<Pair<BinaryRow, Integer>, List<ManifestEntry>> groupByBucket(
List<ManifestEntry> entries) {
return entries.stream()
.collect(
Collectors.groupingBy(
// we use LinkedHashMap to avoid disorder
file -> Pair.of(file.partition(), file.bucket()),
LinkedHashMap::new,
Collectors.toList()));
}
}
Loading
Loading