From e1f938cde17aca3b11de80785ac589faca48fb64 Mon Sep 17 00:00:00 2001 From: "hongli.wwj" Date: Sun, 28 Dec 2025 22:56:37 +0800 Subject: [PATCH] [Core] support limit pushdown with pk table --- .../operation/AbstractFileStoreScan.java | 8 +- .../operation/KeyValueFileStoreScan.java | 150 ++++++++++++++++-- .../operation/KeyValueFileStoreScanTest.java | 148 ++++++++++++++++- .../flink/PrimaryKeyFileStoreTableITCase.java | 127 +++++++++++++++ 4 files changed, 414 insertions(+), 19 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java index 284a2ef19579..9d64ddc73de6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java @@ -277,15 +277,17 @@ public Plan plan() { List manifests = manifestsResult.filteredManifests; Iterator iterator = readManifestEntries(manifests, false); - if (supportsLimitPushManifestEntries()) { - iterator = limitPushManifestEntries(iterator); - } List files = ListUtils.toList(iterator); if (postFilterManifestEntriesEnabled()) { files = postFilterManifestEntries(files); } + if (supportsLimitPushManifestEntries()) { + iterator = limitPushManifestEntries(files.iterator()); + files = ListUtils.toList(iterator); + } + List result = files; long scanDuration = (System.nanoTime() - started) / 1_000_000; diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java index ef6fd1e52b8e..58fce9342493 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java @@ -21,6 +21,7 @@ import org.apache.paimon.CoreOptions.ChangelogProducer; import org.apache.paimon.CoreOptions.MergeEngine; import org.apache.paimon.KeyValueFileStore; +import org.apache.paimon.data.BinaryRow; import org.apache.paimon.fileindex.FileIndexPredicate; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.manifest.FilteredManifestEntry; @@ -34,15 +35,20 @@ import org.apache.paimon.stats.SimpleStatsEvolutions; import org.apache.paimon.table.source.ScanMode; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.ListUtils; import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.SnapshotManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -55,6 +61,8 @@ /** {@link FileStoreScan} for {@link KeyValueFileStore}. */ public class KeyValueFileStoreScan extends AbstractFileStoreScan { + private static final Logger LOG = LoggerFactory.getLogger(KeyValueFileStoreScan.class); + private final SimpleStatsEvolutions fieldKeyStatsConverters; private final SimpleStatsEvolutions fieldValueStatsConverters; private final BucketSelectConverter bucketSelectConverter; @@ -203,6 +211,122 @@ private boolean isValueFilterEnabled() { } } + /** + * Check if limit pushdown is supported for PK tables. + * + *

Not supported when merge engine is PARTIAL_UPDATE/AGGREGATE (need merge) or deletion + * vectors are enabled (can't count deleted rows). For DEDUPLICATE/FIRST_ROW, per-bucket checks + * (no overlapping, no delete rows) are done in applyLimitPushdownForBucket. + */ + @Override + public boolean supportsLimitPushManifestEntries() { + if (mergeEngine == PARTIAL_UPDATE || mergeEngine == AGGREGATE) { + return false; + } + + return limit != null && limit > 0 && !deletionVectorsEnabled; + } + + /** + * Apply limit pushdown by grouping files by bucket and accumulating row counts until limit is + * reached. For buckets that can't safely push down limit (overlapping files or delete rows), + * include all files. + */ + @Override + protected Iterator limitPushManifestEntries(Iterator entries) { + long startTime = System.nanoTime(); + List allEntries = ListUtils.toList(entries); + Map, List> buckets = groupByBucket(allEntries); + + List result = new ArrayList<>(); + long accumulatedRowCount = 0; + + for (List bucketEntries : buckets.values()) { + if (accumulatedRowCount >= limit) { + break; + } + + long remainingLimit = limit - accumulatedRowCount; + List processedBucket = + applyLimitPushdownForBucket(bucketEntries, remainingLimit); + if (processedBucket == null) { + result.addAll(bucketEntries); + } else { + result.addAll(processedBucket); + for (ManifestEntry entry : processedBucket) { + long fileRowCount = entry.file().rowCount(); + accumulatedRowCount += fileRowCount; + } + } + } + + long duration = (System.nanoTime() - startTime) / 1_000_000; + LOG.info( + "Limit pushdown for PK table completed in {} ms. Limit: {}, InputFiles: {}, OutputFiles: {}, " + + "MergeEngine: {}, ScanMode: {}, DeletionVectorsEnabled: {}", + duration, + limit, + allEntries.size(), + result.size(), + mergeEngine, + scanMode, + deletionVectorsEnabled); + return result.iterator(); + } + + /** + * Apply limit pushdown for a single bucket. Returns files to include, or null if unsafe. + * + *

Returns null if files overlap (LSM level 0 or different levels) or have delete rows. For + * non-overlapping files with no delete rows, accumulates row counts until limit is reached. + * + * @param bucketEntries files in the same bucket + * @param limit the limit to apply + * @return files to include, or null if we can't safely push down limit + */ + @Nullable + private List applyLimitPushdownForBucket( + List bucketEntries, long limit) { + // Check if this bucket has overlapping files (LSM property) + boolean hasOverlapping = !noOverlapping(bucketEntries); + + if (hasOverlapping) { + // For buckets with overlapping, we can't safely push down limit because files + // need to be merged and we can't accurately calculate the merged row count. + return null; + } + + // For buckets without overlapping and with merge engines that don't require + // merge (DEDUPLICATE or FIRST_ROW), we can safely accumulate row count + // and stop when limit is reached, but only if files have no delete rows. + List result = new ArrayList<>(); + long accumulatedRowCount = 0; + + for (ManifestEntry entry : bucketEntries) { + long fileRowCount = entry.file().rowCount(); + // Check if file has delete rows - if so, we can't accurately calculate + // the merged row count, so we need to stop limit pushdown + boolean hasDeleteRows = + entry.file().deleteRowCount().map(count -> count > 0L).orElse(false); + + if (hasDeleteRows) { + // If file has delete rows, we can't accurately calculate merged row count + // without reading the actual data. Can't safely push down limit. + return null; + } + + // File has no delete rows, no overlapping, and merge engine doesn't require merge. + // Safe to count rows. + result.add(entry); + accumulatedRowCount += fileRowCount; + if (accumulatedRowCount >= limit) { + break; + } + } + + return result; + } + @Override protected boolean postFilterManifestEntriesEnabled() { return valueFilter != null && scanMode == ScanMode.ALL; @@ -214,15 +338,8 @@ protected List postFilterManifestEntries(List file // Why do this: because in primary key table, we can't just filter the value // by the stat in files (see `PrimaryKeyFileStoreTable.nonPartitionFilterConsumer`), // but we can do this by filter the whole bucket files - return files.stream() - .collect( - Collectors.groupingBy( - // we use LinkedHashMap to avoid disorder - file -> Pair.of(file.partition(), file.bucket()), - LinkedHashMap::new, - Collectors.toList())) - .values() - .stream() + Map, List> buckets = groupByBucket(files); + return buckets.values().stream() .map(this::doFilterWholeBucketByStats) .flatMap(Collection::stream) .collect(Collectors.toList()); @@ -316,4 +433,19 @@ private static boolean noOverlapping(List entries) { return true; } + + /** + * Group manifest entries by (partition, bucket) while preserving order. This is a common + * operation used by both limitPushManifestEntries and postFilterManifestEntries. + */ + private Map, List> groupByBucket( + List entries) { + return entries.stream() + .collect( + Collectors.groupingBy( + // we use LinkedHashMap to avoid disorder + file -> Pair.of(file.partition(), file.bucket()), + LinkedHashMap::new, + Collectors.toList())); + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java index 4f3d5c1c24dd..6cf1ae94157e 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java @@ -32,7 +32,6 @@ import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.types.IntType; import org.apache.paimon.types.RowType; -import org.apache.paimon.utils.SnapshotManager; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -59,7 +58,6 @@ public class KeyValueFileStoreScanTest { private TestKeyValueGenerator gen; @TempDir java.nio.file.Path tempDir; private TestFileStore store; - private SnapshotManager snapshotManager; @BeforeEach public void beforeEach() throws Exception { @@ -76,7 +74,6 @@ public void beforeEach() throws Exception { DeduplicateMergeFunction.factory(), null) .build(); - snapshotManager = store.snapshotManager(); SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(), new Path(tempDir.toUri())); @@ -271,6 +268,147 @@ public void testDropStatsInPlan() throws Exception { } } + @Test + public void testLimitPushdownWithoutValueFilter() throws Exception { + // Write multiple files to test limit pushdown + List data1 = generateData(50); + writeData(data1); + List data2 = generateData(50); + writeData(data2); + List data3 = generateData(50); + Snapshot snapshot = writeData(data3); + + // Without limit, should read all files + KeyValueFileStoreScan scanWithoutLimit = store.newScan(); + scanWithoutLimit.withSnapshot(snapshot.id()); + List filesWithoutLimit = scanWithoutLimit.plan().files(); + int totalFiles = filesWithoutLimit.size(); + assertThat(totalFiles).isGreaterThan(0); + + // With limit, should read fewer files (limit pushdown should work) + KeyValueFileStoreScan scanWithLimit = store.newScan(); + scanWithLimit.withSnapshot(snapshot.id()).withLimit(10); + List filesWithLimit = scanWithLimit.plan().files(); + // Limit pushdown should reduce the number of files read + assertThat(filesWithLimit.size()).isLessThanOrEqualTo(totalFiles); + assertThat(filesWithLimit.size()).isGreaterThan(0); + } + + @Test + public void testLimitPushdownWithValueFilter() throws Exception { + // Write data with different item values + List data1 = generateData(50, 0, 100L); + writeData(data1); + List data2 = generateData(50, 0, 200L); + writeData(data2); + List data3 = generateData(50, 0, 300L); + Snapshot snapshot = writeData(data3); + + // Without valueFilter, limit pushdown should work + KeyValueFileStoreScan scanWithoutFilter = store.newScan(); + scanWithoutFilter.withSnapshot(snapshot.id()).withLimit(10); + List filesWithoutFilter = scanWithoutFilter.plan().files(); + int totalFilesWithoutFilter = filesWithoutFilter.size(); + assertThat(totalFilesWithoutFilter).isGreaterThan(0); + + // With valueFilter, limit pushdown should still work (postFilterManifestEntries runs first) + // postFilterManifestEntries filters files first, then limitPushManifestEntries operates + // on the filtered list. Using file.rowCount() as a conservative upper bound ensures + // we don't return fewer than limit rows, even if we might read slightly more files. + KeyValueFileStoreScan scanWithFilter = store.newScan(); + scanWithFilter.withSnapshot(snapshot.id()); + scanWithFilter.withValueFilter( + new PredicateBuilder(TestKeyValueGenerator.DEFAULT_ROW_TYPE) + .between(4, 100L, 200L)); + scanWithFilter.withLimit(10); + List filesWithFilter = scanWithFilter.plan().files(); + + // Limit pushdown should work with valueFilter + // The number of files should be less than or equal to the total files after filtering + assertThat(filesWithFilter.size()).isGreaterThan(0); + assertThat(filesWithFilter.size()).isLessThanOrEqualTo(totalFilesWithoutFilter); + } + + @Test + public void testLimitPushdownWithKeyFilter() throws Exception { + // Write data with different shop IDs + List data = generateData(200); + Snapshot snapshot = writeData(data); + + // With keyFilter, limit pushdown should still work (keyFilter doesn't affect limit + // pushdown) + KeyValueFileStoreScan scan = store.newScan(); + scan.withSnapshot(snapshot.id()); + scan.withKeyFilter( + new PredicateBuilder(RowType.of(new IntType(false))) + .equal(0, data.get(0).key().getInt(0))); + scan.withLimit(5); + List files = scan.plan().files(); + assertThat(files.size()).isGreaterThan(0); + } + + @Test + public void testLimitPushdownMultipleBuckets() throws Exception { + // Write data to multiple buckets to test limit pushdown across buckets + List data1 = generateData(30); + writeData(data1); + List data2 = generateData(30); + writeData(data2); + List data3 = generateData(30); + Snapshot snapshot = writeData(data3); + + // Without limit, should read all files + KeyValueFileStoreScan scanWithoutLimit = store.newScan(); + scanWithoutLimit.withSnapshot(snapshot.id()); + List filesWithoutLimit = scanWithoutLimit.plan().files(); + int totalFiles = filesWithoutLimit.size(); + assertThat(totalFiles).isGreaterThan(0); + + // With limit, should read fewer files (limit pushdown should work across buckets) + KeyValueFileStoreScan scanWithLimit = store.newScan(); + scanWithLimit.withSnapshot(snapshot.id()).withLimit(20); + List filesWithLimit = scanWithLimit.plan().files(); + // Limit pushdown should reduce the number of files read + assertThat(filesWithLimit.size()).isLessThanOrEqualTo(totalFiles); + assertThat(filesWithLimit.size()).isGreaterThan(0); + } + + @Test + public void testLimitPushdownWithSmallLimit() throws Exception { + // Test limit pushdown with a very small limit + List data1 = generateData(100); + writeData(data1); + List data2 = generateData(100); + writeData(data2); + Snapshot snapshot = writeData(data2); + + KeyValueFileStoreScan scan = store.newScan(); + scan.withSnapshot(snapshot.id()).withLimit(1); + List files = scan.plan().files(); + // Should read at least one file, but fewer than all files + assertThat(files.size()).isGreaterThan(0); + } + + @Test + public void testLimitPushdownWithLargeLimit() throws Exception { + // Test limit pushdown with a large limit (larger than total rows) + List data1 = generateData(50); + writeData(data1); + List data2 = generateData(50); + Snapshot snapshot = writeData(data2); + + KeyValueFileStoreScan scanWithoutLimit = store.newScan(); + scanWithoutLimit.withSnapshot(snapshot.id()); + List filesWithoutLimit = scanWithoutLimit.plan().files(); + int totalFiles = filesWithoutLimit.size(); + + KeyValueFileStoreScan scanWithLimit = store.newScan(); + scanWithLimit.withSnapshot(snapshot.id()).withLimit(10000); + List filesWithLimit = scanWithLimit.plan().files(); + // With a large limit, should read all files + assertThat(filesWithLimit.size()).isEqualTo(totalFiles); + } + private void runTestExactMatch( FileStoreScan scan, Long expectedSnapshotId, Map expected) throws Exception { @@ -307,10 +445,6 @@ private List generateData(int numRecords) { return data; } - private List generateData(int numRecords, int hr) { - return generateData(numRecords, hr, null); - } - private List generateData(int numRecords, int hr, Long itemId) { List data = new ArrayList<>(); for (int i = 0; i < numRecords; i++) { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java index e1426fc8b56d..dd3197a399c0 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java @@ -1579,4 +1579,131 @@ private void assertResult(int numProducers) { } } } + + @Test + public void testLimitPushdownWithTimeFilter() throws Exception { + // This test verifies that limit pushdown works correctly when valueFilter + // (e.g., time-based where conditions) is present. postFilterManifestEntries runs first + // to filter files, then limitPushManifestEntries operates on the filtered list. + TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build(); + tEnv.executeSql(createCatalogSql("testCatalog", path + "/warehouse")); + tEnv.executeSql("USE CATALOG testCatalog"); + tEnv.executeSql( + "CREATE TABLE T (" + + "id INT, " + + "name STRING, " + + "ts TIMESTAMP(3), " + + "PRIMARY KEY (id) NOT ENFORCED" + + ")"); + + // Insert data with different timestamps + tEnv.executeSql( + "INSERT INTO T VALUES " + + "(1, 'a', TIMESTAMP '2024-01-01 10:00:00'), " + + "(2, 'b', TIMESTAMP '2024-01-01 11:00:00'), " + + "(3, 'c', TIMESTAMP '2024-01-01 12:00:00'), " + + "(4, 'd', TIMESTAMP '2024-01-01 13:00:00'), " + + "(5, 'e', TIMESTAMP '2024-01-01 14:00:00')") + .await(); + + // Without filter, limit pushdown should work + try (CloseableIterator iter = tEnv.executeSql("SELECT * FROM T LIMIT 3").collect()) { + List allRows = new ArrayList<>(); + iter.forEachRemaining(allRows::add); + assertThat(allRows.size()).isEqualTo(3); + } + + // Test limit pushdown with time filter (4 rows match, LIMIT 3) + try (CloseableIterator iter = + tEnv.executeSql( + "SELECT * FROM T WHERE ts >= TIMESTAMP '2024-01-01 11:00:00' LIMIT 3") + .collect()) { + List filteredRows = new ArrayList<>(); + iter.forEachRemaining(filteredRows::add); + assertThat(filteredRows.size()).isGreaterThanOrEqualTo(3); + assertThat(filteredRows.size()).isLessThanOrEqualTo(4); + for (Row row : filteredRows) { + java.time.LocalDateTime ts = (java.time.LocalDateTime) row.getField(2); + java.time.LocalDateTime filterTime = + java.time.LocalDateTime.parse("2024-01-01T11:00:00"); + assertThat(ts).isAfterOrEqualTo(filterTime); + } + } + + // Test with more restrictive filter (3 rows match, LIMIT 2) + try (CloseableIterator iter = + tEnv.executeSql( + "SELECT * FROM T WHERE ts >= TIMESTAMP '2024-01-01 12:00:00' LIMIT 2") + .collect()) { + List filteredRows2 = new ArrayList<>(); + iter.forEachRemaining(filteredRows2::add); + assertThat(filteredRows2.size()).isGreaterThanOrEqualTo(2); + assertThat(filteredRows2.size()).isLessThanOrEqualTo(3); + for (Row row : filteredRows2) { + java.time.LocalDateTime ts = (java.time.LocalDateTime) row.getField(2); + java.time.LocalDateTime filterTime = + java.time.LocalDateTime.parse("2024-01-01T12:00:00"); + assertThat(ts).isAfterOrEqualTo(filterTime); + } + } + } + + @Test + public void testLimitPushdownBasic() throws Exception { + // Test basic limit pushdown + TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build(); + tEnv.executeSql(createCatalogSql("testCatalog", path + "/warehouse")); + tEnv.executeSql("USE CATALOG testCatalog"); + tEnv.executeSql( + "CREATE TABLE T (" + + "id INT, " + + "name STRING, " + + "PRIMARY KEY (id) NOT ENFORCED" + + ")"); + + tEnv.executeSql("INSERT INTO T VALUES (1, 'a'), (2, 'b'), (3, 'c')").await(); + tEnv.executeSql("INSERT INTO T VALUES (4, 'd'), (5, 'e'), (6, 'f')").await(); + tEnv.executeSql("INSERT INTO T VALUES (7, 'g'), (8, 'h'), (9, 'i')").await(); + + try (CloseableIterator iter = tEnv.executeSql("SELECT * FROM T LIMIT 5").collect()) { + List rows = new ArrayList<>(); + iter.forEachRemaining(rows::add); + + assertThat(rows.size()).isEqualTo(5); + } + } + + @Test + public void testLimitPushdownWithDeletionVector() throws Exception { + // Test limit pushdown is disabled when deletion vector is enabled + TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build(); + tEnv.executeSql(createCatalogSql("testCatalog", path + "/warehouse")); + tEnv.executeSql("USE CATALOG testCatalog"); + tEnv.executeSql( + "CREATE TABLE T (" + + "id INT, " + + "name STRING, " + + "PRIMARY KEY (id) NOT ENFORCED" + + ") WITH (" + + "'deletion-vectors.enabled' = 'true'" + + ")"); + + tEnv.executeSql("INSERT INTO T VALUES (1, 'a'), (2, 'b'), (3, 'c')").await(); + tEnv.executeSql("INSERT INTO T VALUES (4, 'd'), (5, 'e'), (6, 'f')").await(); + + tEnv.executeSql("DELETE FROM T WHERE id = 2").await(); + + // Limit pushdown should be disabled when deletion vector is enabled + // because we can't accurately calculate row count after applying deletion vectors + try (CloseableIterator iter = tEnv.executeSql("SELECT * FROM T LIMIT 3").collect()) { + List rows = new ArrayList<>(); + iter.forEachRemaining(rows::add); + + assertThat(rows.size()).isEqualTo(3); + + for (Row row : rows) { + assertThat(row.getField(0)).isNotEqualTo(2); + } + } + } }