listTableDetailsPaged(
- String databaseName, Integer maxResults, String pageToken, String tableNamePattern)
+ String databaseName,
+ Integer maxResults,
+ String pageToken,
+ String tableNamePattern,
+ String tableType)
throws DatabaseNotExistException {
- return wrapped.listTableDetailsPaged(databaseName, maxResults, pageToken, tableNamePattern);
+ return wrapped.listTableDetailsPaged(
+ databaseName, maxResults, pageToken, tableNamePattern, tableType);
}
@Override
@@ -165,6 +175,11 @@ public boolean supportsListByPattern() {
return wrapped.supportsListByPattern();
}
+ @Override
+ public boolean supportsListTableByType() {
+ return wrapped.supportsListTableByType();
+ }
+
@Override
public boolean supportsVersionManagement() {
return wrapped.supportsVersionManagement();
diff --git a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileWriter.java
index 67b97389175b..6ce9e1c3875e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileWriter.java
@@ -52,7 +52,6 @@ public DeletionVectorIndexFileWriter(
* TODO: We can consider sending a message to delete the deletion file in the future.
*/
public IndexFileMeta writeSingleFile(Map input) throws IOException {
-
DeletionFileWriter writer = new DeletionFileWriter(indexPathFactory, fileIO);
try {
for (Map.Entry entry : input.entrySet()) {
diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergDataField.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergDataField.java
index 1a392b0ae9e5..3714d59bfb24 100644
--- a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergDataField.java
+++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergDataField.java
@@ -174,15 +174,15 @@ private static Object toTypeObject(DataType dataType, int fieldId, int depth) {
case TIMESTAMP_WITHOUT_TIME_ZONE:
int timestampPrecision = ((TimestampType) dataType).getPrecision();
Preconditions.checkArgument(
- timestampPrecision > 3 && timestampPrecision <= 6,
- "Paimon Iceberg compatibility only support timestamp type with precision from 4 to 6.");
- return "timestamp";
+ timestampPrecision > 3 && timestampPrecision <= 9,
+ "Paimon Iceberg compatibility only support timestamp type with precision from 4 to 9.");
+ return timestampPrecision >= 7 ? "timestamp_ns" : "timestamp";
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
int timestampLtzPrecision = ((LocalZonedTimestampType) dataType).getPrecision();
Preconditions.checkArgument(
- timestampLtzPrecision > 3 && timestampLtzPrecision <= 6,
- "Paimon Iceberg compatibility only support timestamp type with precision from 4 to 6.");
- return "timestamptz";
+ timestampLtzPrecision > 3 && timestampLtzPrecision <= 9,
+ "Paimon Iceberg compatibility only support timestamp type with precision from 4 to 9.");
+ return timestampLtzPrecision >= 7 ? "timestamptz_ns" : "timestamptz";
case ARRAY:
ArrayType arrayType = (ArrayType) dataType;
return new IcebergListType(
diff --git a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMetaSerializer.java b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMetaSerializer.java
index 4496ec50f4b2..51d46f62e9b4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMetaSerializer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMetaSerializer.java
@@ -85,10 +85,11 @@ public static LinkedHashMap rowArrayDataToDvMetas(
LinkedHashMap dvMetas = new LinkedHashMap<>(arrayData.size());
for (int i = 0; i < arrayData.size(); i++) {
InternalRow row = arrayData.getRow(i, DeletionVectorMeta.SCHEMA.getFieldCount());
+ String dataFileName = row.getString(0).toString();
dvMetas.put(
- row.getString(0).toString(),
+ dataFileName,
new DeletionVectorMeta(
- row.getString(0).toString(),
+ dataFileName,
row.getInt(1),
row.getInt(2),
row.isNullAt(3) ? null : row.getLong(3)));
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/CompactIncrement.java b/paimon-core/src/main/java/org/apache/paimon/io/CompactIncrement.java
index ad51642a2a4b..6454c99ef8ab 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/CompactIncrement.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/CompactIncrement.java
@@ -18,6 +18,9 @@
package org.apache.paimon.io;
+import org.apache.paimon.index.IndexFileMeta;
+
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
@@ -29,14 +32,27 @@ public class CompactIncrement {
private final List compactBefore;
private final List compactAfter;
private final List changelogFiles;
+ private final List newIndexFiles;
+ private final List deletedIndexFiles;
public CompactIncrement(
List compactBefore,
List compactAfter,
List changelogFiles) {
+ this(compactBefore, compactAfter, changelogFiles, new ArrayList<>(), new ArrayList<>());
+ }
+
+ public CompactIncrement(
+ List compactBefore,
+ List compactAfter,
+ List changelogFiles,
+ List newIndexFiles,
+ List deletedIndexFiles) {
this.compactBefore = compactBefore;
this.compactAfter = compactAfter;
this.changelogFiles = changelogFiles;
+ this.newIndexFiles = newIndexFiles;
+ this.deletedIndexFiles = deletedIndexFiles;
}
public List compactBefore() {
@@ -51,8 +67,20 @@ public List changelogFiles() {
return changelogFiles;
}
+ public List newIndexFiles() {
+ return newIndexFiles;
+ }
+
+ public List deletedIndexFiles() {
+ return deletedIndexFiles;
+ }
+
public boolean isEmpty() {
- return compactBefore.isEmpty() && compactAfter.isEmpty() && changelogFiles.isEmpty();
+ return compactBefore.isEmpty()
+ && compactAfter.isEmpty()
+ && changelogFiles.isEmpty()
+ && newIndexFiles.isEmpty()
+ && deletedIndexFiles.isEmpty();
}
@Override
@@ -67,7 +95,9 @@ public boolean equals(Object o) {
CompactIncrement that = (CompactIncrement) o;
return Objects.equals(compactBefore, that.compactBefore)
&& Objects.equals(compactAfter, that.compactAfter)
- && Objects.equals(changelogFiles, that.changelogFiles);
+ && Objects.equals(changelogFiles, that.changelogFiles)
+ && Objects.equals(newIndexFiles, that.newIndexFiles)
+ && Objects.equals(deletedIndexFiles, that.deletedIndexFiles);
}
@Override
@@ -78,10 +108,14 @@ public int hashCode() {
@Override
public String toString() {
return String.format(
- "CompactIncrement {compactBefore = %s, compactAfter = %s, changelogFiles = %s}",
+ "CompactIncrement {compactBefore = %s, compactAfter = %s, changelogFiles = %s, newIndexFiles = %s, deletedIndexFiles = %s}",
compactBefore.stream().map(DataFileMeta::fileName).collect(Collectors.toList()),
compactAfter.stream().map(DataFileMeta::fileName).collect(Collectors.toList()),
- changelogFiles.stream().map(DataFileMeta::fileName).collect(Collectors.toList()));
+ changelogFiles.stream().map(DataFileMeta::fileName).collect(Collectors.toList()),
+ newIndexFiles.stream().map(IndexFileMeta::fileName).collect(Collectors.toList()),
+ deletedIndexFiles.stream()
+ .map(IndexFileMeta::fileName)
+ .collect(Collectors.toList()));
}
public static CompactIncrement emptyIncrement() {
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java
index f7b283daf013..8f7c9d6dad3b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java
@@ -48,7 +48,7 @@ public class DataFileRecordReader implements FileRecordReader {
@Nullable private final int[] indexMapping;
@Nullable private final PartitionInfo partitionInfo;
@Nullable private final CastFieldGetter[] castMapping;
- private final boolean rowLineageEnabled;
+ private final boolean rowTrackingEnabled;
@Nullable private final Long firstRowId;
private final long maxSequenceNumber;
private final Map systemFields;
@@ -60,7 +60,7 @@ public DataFileRecordReader(
@Nullable int[] indexMapping,
@Nullable CastFieldGetter[] castMapping,
@Nullable PartitionInfo partitionInfo,
- boolean rowLineageEnabled,
+ boolean rowTrackingEnabled,
@Nullable Long firstRowId,
long maxSequenceNumber,
Map systemFields)
@@ -75,7 +75,7 @@ public DataFileRecordReader(
this.indexMapping = indexMapping;
this.partitionInfo = partitionInfo;
this.castMapping = castMapping;
- this.rowLineageEnabled = rowLineageEnabled;
+ this.rowTrackingEnabled = rowTrackingEnabled;
this.firstRowId = firstRowId;
this.maxSequenceNumber = maxSequenceNumber;
this.systemFields = systemFields;
@@ -91,10 +91,10 @@ public FileRecordIterator readBatch() throws IOException {
if (iterator instanceof ColumnarRowIterator) {
iterator = ((ColumnarRowIterator) iterator).mapping(partitionInfo, indexMapping);
- if (rowLineageEnabled) {
+ if (rowTrackingEnabled) {
iterator =
((ColumnarRowIterator) iterator)
- .assignRowLineage(firstRowId, maxSequenceNumber, systemFields);
+ .assignRowTracking(firstRowId, maxSequenceNumber, systemFields);
}
} else {
if (partitionInfo != null) {
@@ -108,33 +108,33 @@ public FileRecordIterator readBatch() throws IOException {
iterator = iterator.transform(projectedRow::replaceRow);
}
- if (rowLineageEnabled && !systemFields.isEmpty()) {
- GenericRow lineageRow = new GenericRow(2);
+ if (rowTrackingEnabled && !systemFields.isEmpty()) {
+ GenericRow trackingRow = new GenericRow(2);
- int[] fallbackToLineageMappings = new int[tableRowType.getFieldCount()];
- Arrays.fill(fallbackToLineageMappings, -1);
+ int[] fallbackToTrackingMappings = new int[tableRowType.getFieldCount()];
+ Arrays.fill(fallbackToTrackingMappings, -1);
if (systemFields.containsKey(SpecialFields.ROW_ID.name())) {
- fallbackToLineageMappings[systemFields.get(SpecialFields.ROW_ID.name())] = 0;
+ fallbackToTrackingMappings[systemFields.get(SpecialFields.ROW_ID.name())] = 0;
}
if (systemFields.containsKey(SpecialFields.SEQUENCE_NUMBER.name())) {
- fallbackToLineageMappings[
+ fallbackToTrackingMappings[
systemFields.get(SpecialFields.SEQUENCE_NUMBER.name())] =
1;
}
FallbackMappingRow fallbackMappingRow =
- new FallbackMappingRow(fallbackToLineageMappings);
+ new FallbackMappingRow(fallbackToTrackingMappings);
final FileRecordIterator iteratorInner = iterator;
iterator =
iterator.transform(
row -> {
if (firstRowId != null) {
- lineageRow.setField(
+ trackingRow.setField(
0, iteratorInner.returnedPosition() + firstRowId);
}
- lineageRow.setField(1, maxSequenceNumber);
- return fallbackMappingRow.replace(row, lineageRow);
+ trackingRow.setField(1, maxSequenceNumber);
+ return fallbackMappingRow.replace(row, trackingRow);
});
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataIncrement.java b/paimon-core/src/main/java/org/apache/paimon/io/DataIncrement.java
index b7860fb15e79..6049c4dbbb0c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataIncrement.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataIncrement.java
@@ -18,25 +18,41 @@
package org.apache.paimon.io;
+import org.apache.paimon.index.IndexFileMeta;
+
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
-/** Newly created data files and changelog files. */
+/** Increment of data files, changelog files and index files. */
public class DataIncrement {
private final List newFiles;
private final List deletedFiles;
private final List changelogFiles;
+ private final List newIndexFiles;
+ private final List deletedIndexFiles;
public DataIncrement(
List newFiles,
List deletedFiles,
List changelogFiles) {
+ this(newFiles, deletedFiles, changelogFiles, new ArrayList<>(), new ArrayList<>());
+ }
+
+ public DataIncrement(
+ List newFiles,
+ List deletedFiles,
+ List changelogFiles,
+ List newIndexFiles,
+ List deletedIndexFiles) {
this.newFiles = newFiles;
this.deletedFiles = deletedFiles;
this.changelogFiles = changelogFiles;
+ this.newIndexFiles = newIndexFiles;
+ this.deletedIndexFiles = deletedIndexFiles;
}
public static DataIncrement emptyIncrement() {
@@ -56,8 +72,20 @@ public List changelogFiles() {
return changelogFiles;
}
+ public List newIndexFiles() {
+ return newIndexFiles;
+ }
+
+ public List deletedIndexFiles() {
+ return deletedIndexFiles;
+ }
+
public boolean isEmpty() {
- return newFiles.isEmpty() && changelogFiles.isEmpty();
+ return newFiles.isEmpty()
+ && deletedFiles.isEmpty()
+ && changelogFiles.isEmpty()
+ && newIndexFiles.isEmpty()
+ && deletedIndexFiles.isEmpty();
}
@Override
@@ -71,20 +99,28 @@ public boolean equals(Object o) {
DataIncrement that = (DataIncrement) o;
return Objects.equals(newFiles, that.newFiles)
- && Objects.equals(changelogFiles, that.changelogFiles);
+ && Objects.equals(deletedFiles, that.deletedFiles)
+ && Objects.equals(changelogFiles, that.changelogFiles)
+ && Objects.equals(newIndexFiles, that.newIndexFiles)
+ && Objects.equals(deletedIndexFiles, that.deletedIndexFiles);
}
@Override
public int hashCode() {
- return Objects.hash(newFiles, changelogFiles);
+ return Objects.hash(
+ newFiles, deletedFiles, changelogFiles, newIndexFiles, deletedIndexFiles);
}
@Override
public String toString() {
return String.format(
- "DataIncrement {newFiles = %s, deletedFiles = %s, changelogFiles = %s}",
+ "DataIncrement {newFiles = %s, deletedFiles = %s, changelogFiles = %s, newIndexFiles = %s, deletedIndexFiles = %s}",
newFiles.stream().map(DataFileMeta::fileName).collect(Collectors.toList()),
deletedFiles.stream().map(DataFileMeta::fileName).collect(Collectors.toList()),
- changelogFiles.stream().map(DataFileMeta::fileName).collect(Collectors.toList()));
+ changelogFiles.stream().map(DataFileMeta::fileName).collect(Collectors.toList()),
+ newIndexFiles.stream().map(IndexFileMeta::fileName).collect(Collectors.toList()),
+ deletedIndexFiles.stream()
+ .map(IndexFileMeta::fileName)
+ .collect(Collectors.toList()));
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/IndexIncrement.java b/paimon-core/src/main/java/org/apache/paimon/io/IndexIncrement.java
deleted file mode 100644
index 9f985f54ed41..000000000000
--- a/paimon-core/src/main/java/org/apache/paimon/io/IndexIncrement.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.io;
-
-import org.apache.paimon.index.IndexFileMeta;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
-import java.util.stream.Collectors;
-
-/** Incremental index files. */
-public class IndexIncrement {
-
- private final List newIndexFiles;
-
- private final List deletedIndexFiles;
-
- public IndexIncrement(List newIndexFiles) {
- this.newIndexFiles = newIndexFiles;
- this.deletedIndexFiles = Collections.emptyList();
- }
-
- public IndexIncrement(
- List newIndexFiles, List deletedIndexFiles) {
- this.newIndexFiles = newIndexFiles;
- this.deletedIndexFiles = deletedIndexFiles;
- }
-
- public List newIndexFiles() {
- return newIndexFiles;
- }
-
- public List deletedIndexFiles() {
- return deletedIndexFiles;
- }
-
- public boolean isEmpty() {
- return newIndexFiles.isEmpty() && deletedIndexFiles.isEmpty();
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- IndexIncrement that = (IndexIncrement) o;
- return Objects.equals(newIndexFiles, that.newIndexFiles)
- && Objects.equals(deletedIndexFiles, that.deletedIndexFiles);
- }
-
- @Override
- public int hashCode() {
- List all = new ArrayList<>(newIndexFiles);
- all.addAll(deletedIndexFiles);
- return Objects.hash(all);
- }
-
- @Override
- public String toString() {
- return String.format(
- "IndexIncrement {newIndexFiles = %s, deletedIndexFiles = %s}",
- newIndexFiles.stream().map(IndexFileMeta::fileName).collect(Collectors.toList()),
- deletedIndexFiles.stream()
- .map(IndexFileMeta::fileName)
- .collect(Collectors.toList()));
- }
-}
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/PojoDataFileMeta.java b/paimon-core/src/main/java/org/apache/paimon/io/PojoDataFileMeta.java
index 6e858fbd2286..88f1a740edea 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/PojoDataFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/PojoDataFileMeta.java
@@ -50,7 +50,7 @@ public class PojoDataFileMeta implements DataFileMeta {
private final SimpleStats keyStats;
private final SimpleStats valueStats;
- // As for row-lineage table, this will be reassigned while committing
+ // As for row-tracking table, this will be reassigned while committing
private final long minSequenceNumber;
private final long maxSequenceNumber;
private final long schemaId;
diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
index 52b029563e9c..b400258b8439 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
@@ -22,7 +22,6 @@
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.Filter;
-import org.apache.paimon.utils.Preconditions;
import javax.annotation.Nullable;
@@ -40,6 +39,7 @@
import static org.apache.paimon.utils.ManifestReadThreadPool.randomlyExecuteSequentialReturn;
import static org.apache.paimon.utils.ManifestReadThreadPool.sequentialBatchedExecute;
+import static org.apache.paimon.utils.Preconditions.checkState;
/** Entry representing a file. */
public interface FileEntry {
@@ -77,7 +77,7 @@ class Identifier {
public final int level;
public final String fileName;
public final List extraFiles;
- @Nullable private final byte[] embeddedIndex;
+ @Nullable public final byte[] embeddedIndex;
@Nullable public final String externalPath;
/* Cache the hash code for the string */
@@ -190,7 +190,7 @@ static void mergeEntries(Iterable entries, Map from(List entries) {
return entries.stream().map(SimpleFileEntry::from).collect(Collectors.toList());
}
@@ -115,6 +130,11 @@ public String fileName() {
return fileName;
}
+ @Nullable
+ public byte[] embeddedIndex() {
+ return embeddedIndex;
+ }
+
@Nullable
@Override
public String externalPath() {
diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntryWithDV.java b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntryWithDV.java
new file mode 100644
index 000000000000..75d73f345f21
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntryWithDV.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.manifest;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+
+/** A {@link FileEntry} contains {@link SimpleFileEntry} and dv file name. */
+public class SimpleFileEntryWithDV extends SimpleFileEntry {
+
+ @Nullable private final String dvFileName;
+
+ public SimpleFileEntryWithDV(SimpleFileEntry entry, @Nullable String dvFileName) {
+ super(
+ entry.kind(),
+ entry.partition(),
+ entry.bucket(),
+ entry.totalBuckets(),
+ entry.level(),
+ entry.fileName(),
+ entry.extraFiles(),
+ entry.embeddedIndex(),
+ entry.minKey(),
+ entry.maxKey(),
+ entry.externalPath());
+ this.dvFileName = dvFileName;
+ }
+
+ public Identifier identifier() {
+ return new IdentifierWithDv(super.identifier(), dvFileName);
+ }
+
+ @Nullable
+ public String dvFileName() {
+ return dvFileName;
+ }
+
+ public SimpleFileEntry toDelete() {
+ return new SimpleFileEntryWithDV(super.toDelete(), dvFileName);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ SimpleFileEntryWithDV that = (SimpleFileEntryWithDV) o;
+ return Objects.equals(dvFileName, that.dvFileName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), dvFileName);
+ }
+
+ @Override
+ public String toString() {
+ return super.toString() + ", {dvFileName=" + dvFileName + '}';
+ }
+
+ /**
+ * The same {@link Identifier} indicates that the {@link ManifestEntry} refers to the same data
+ * file.
+ */
+ static class IdentifierWithDv extends Identifier {
+
+ private final String dvFileName;
+
+ public IdentifierWithDv(Identifier identifier, String dvFileName) {
+ super(
+ identifier.partition,
+ identifier.bucket,
+ identifier.level,
+ identifier.fileName,
+ identifier.extraFiles,
+ identifier.embeddedIndex,
+ identifier.externalPath);
+ this.dvFileName = dvFileName;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ IdentifierWithDv that = (IdentifierWithDv) o;
+ return Objects.equals(dvFileName, that.dvFileName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), dvFileName);
+ }
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/KeyValueBuffer.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/KeyValueBuffer.java
index 597e2847b55b..7e40ddaa8660 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/KeyValueBuffer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/KeyValueBuffer.java
@@ -20,6 +20,7 @@
import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
+import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.disk.ExternalBuffer;
import org.apache.paimon.disk.IOManager;
@@ -40,6 +41,7 @@
import java.util.Comparator;
import java.util.List;
import java.util.NoSuchElementException;
+import java.util.function.Supplier;
/** A buffer to cache {@link KeyValue}s. */
public interface KeyValueBuffer {
@@ -65,6 +67,12 @@ public HybridBuffer(int threshold, LazyField lazyBinaryBuffer) {
this.lazyBinaryBuffer = lazyBinaryBuffer;
}
+ @Nullable
+ @VisibleForTesting
+ BinaryBuffer binaryBuffer() {
+ return binaryBuffer;
+ }
+
@Override
public void reset() {
listBuffer.reset();
@@ -89,7 +97,9 @@ public void put(KeyValue kv) {
private void spillToBinary() {
BinaryBuffer binaryBuffer = lazyBinaryBuffer.get();
try (CloseableIterator iterator = listBuffer.iterator()) {
- binaryBuffer.put(iterator.next());
+ while (iterator.hasNext()) {
+ binaryBuffer.put(iterator.next());
+ }
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -162,20 +172,20 @@ public CloseableIterator iterator() {
return new CloseableIterator() {
private boolean hasNextWasCalled = false;
- private boolean hasNext = false;
+ private boolean nextResult = false;
@Override
public boolean hasNext() {
if (!hasNextWasCalled) {
- hasNext = iterator.advanceNext();
+ nextResult = iterator.advanceNext();
hasNextWasCalled = true;
}
- return hasNext;
+ return nextResult;
}
@Override
public KeyValue next() {
- if (!hasNext) {
+ if (!hasNext()) {
throw new NoSuchElementException();
}
hasNextWasCalled = false;
@@ -215,6 +225,17 @@ static BinaryBuffer createBinaryBuffer(
return new BinaryBuffer(buffer, kvSerializer);
}
+ static HybridBuffer createHybridBuffer(
+ CoreOptions options,
+ RowType keyType,
+ RowType valueType,
+ @Nullable IOManager ioManager) {
+ Supplier binarySupplier =
+ () -> createBinaryBuffer(options, keyType, valueType, ioManager);
+ int threshold = options == null ? 1024 : options.lookupMergeRecordsThreshold();
+ return new HybridBuffer(threshold, new LazyField<>(binarySupplier));
+ }
+
static void insertInto(
KeyValueBuffer buffer, KeyValue highLevel, Comparator comparator) {
List newCandidates = new ArrayList<>();
diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
index bcd0692757b4..1bd9aaa84339 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
@@ -22,17 +22,12 @@
import org.apache.paimon.KeyValue;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
-import org.apache.paimon.mergetree.compact.KeyValueBuffer.BinaryBuffer;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.CloseableIterator;
-import org.apache.paimon.utils.LazyField;
import javax.annotation.Nullable;
import java.util.Comparator;
-import java.util.function.Supplier;
-
-import static org.apache.paimon.mergetree.compact.KeyValueBuffer.createBinaryBuffer;
/**
* A {@link MergeFunction} for lookup, this wrapper only considers the latest high level record,
@@ -54,11 +49,7 @@ public LookupMergeFunction(
RowType valueType,
@Nullable IOManager ioManager) {
this.mergeFunction = mergeFunction;
- Supplier binarySupplier =
- () -> createBinaryBuffer(options, keyType, valueType, ioManager);
- int threshold = options == null ? 1024 : options.lookupMergeRecordsThreshold();
- this.candidates =
- new KeyValueBuffer.HybridBuffer(threshold, new LazyField<>(binarySupplier));
+ this.candidates = KeyValueBuffer.createHybridBuffer(options, keyType, valueType, ioManager);
}
@Override
diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldCollectAgg.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldCollectAgg.java
index d9e706f6e853..6fbd30508568 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldCollectAgg.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldCollectAgg.java
@@ -86,7 +86,7 @@ public FieldCollectAgg(String name, ArrayType dataType, boolean distinct) {
public Object aggReversed(Object accumulator, Object inputField) {
// we don't need to actually do the reverse here for this agg
// because accumulator has been distinct, just let accumulator be accumulator will speed up
- // dinstinct process
+ // distinct process
return agg(accumulator, inputField);
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java
index 005bf7b17f1f..b2848b35b410 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java
@@ -36,6 +36,7 @@
import static org.apache.paimon.codegen.CodeGenUtils.newProjection;
import static org.apache.paimon.codegen.CodeGenUtils.newRecordEqualiser;
+import static org.apache.paimon.options.ConfigOptions.key;
import static org.apache.paimon.utils.Preconditions.checkNotNull;
/**
@@ -51,7 +52,10 @@ public class FieldNestedUpdateAgg extends FieldAggregator {
@Nullable private final Projection keyProjection;
@Nullable private final RecordEqualiser elementEqualiser;
- public FieldNestedUpdateAgg(String name, ArrayType dataType, List nestedKey) {
+ private final int countLimit;
+
+ public FieldNestedUpdateAgg(
+ String name, ArrayType dataType, List nestedKey, int countLimit) {
super(name, dataType);
RowType nestedType = (RowType) dataType.getElementType();
this.nestedFields = nestedType.getFieldCount();
@@ -62,6 +66,9 @@ public FieldNestedUpdateAgg(String name, ArrayType dataType, List nested
this.keyProjection = newProjection(nestedType, nestedKey);
this.elementEqualiser = null;
}
+
+ // If deduplicate key is set, we don't guarantee that the result is exactly right
+ this.countLimit = countLimit;
}
@Override
@@ -73,9 +80,15 @@ public Object agg(Object accumulator, Object inputField) {
InternalArray acc = (InternalArray) accumulator;
InternalArray input = (InternalArray) inputField;
+ if (acc.size() >= countLimit) {
+ return accumulator;
+ }
+
+ int remainCount = countLimit - acc.size();
+
List rows = new ArrayList<>(acc.size() + input.size());
addNonNullRows(acc, rows);
- addNonNullRows(input, rows);
+ addNonNullRows(input, rows, remainCount);
if (keyProjection != null) {
Map map = new HashMap<>();
@@ -141,4 +154,18 @@ private void addNonNullRows(InternalArray array, List rows) {
rows.add(array.getRow(i, nestedFields));
}
}
+
+ private void addNonNullRows(InternalArray array, List rows, int remainSize) {
+ int count = 0;
+ for (int i = 0; i < array.size(); i++) {
+ if (count >= remainSize) {
+ return;
+ }
+ if (array.isNullAt(i)) {
+ continue;
+ }
+ rows.add(array.getRow(i, nestedFields));
+ count++;
+ }
+ }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldNestedUpdateAggFactory.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldNestedUpdateAggFactory.java
index 43d8eb429f07..070931e01135 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldNestedUpdateAggFactory.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldNestedUpdateAggFactory.java
@@ -36,7 +36,10 @@ public class FieldNestedUpdateAggFactory implements FieldAggregatorFactory {
@Override
public FieldNestedUpdateAgg create(DataType fieldType, CoreOptions options, String field) {
- return createFieldNestedUpdateAgg(fieldType, options.fieldNestedUpdateAggNestedKey(field));
+ return createFieldNestedUpdateAgg(
+ fieldType,
+ options.fieldNestedUpdateAggNestedKey(field),
+ options.fieldNestedUpdateAggCountLimit(field));
}
@Override
@@ -45,7 +48,7 @@ public String identifier() {
}
private FieldNestedUpdateAgg createFieldNestedUpdateAgg(
- DataType fieldType, List nestedKey) {
+ DataType fieldType, List nestedKey, int countLimit) {
if (nestedKey == null) {
nestedKey = Collections.emptyList();
}
@@ -56,6 +59,6 @@ private FieldNestedUpdateAgg createFieldNestedUpdateAgg(
ArrayType arrayType = (ArrayType) fieldType;
checkArgument(arrayType.getElementType() instanceof RowType, typeErrorMsg, fieldType);
- return new FieldNestedUpdateAgg(identifier(), arrayType, nestedKey);
+ return new FieldNestedUpdateAgg(identifier(), arrayType, nestedKey, countLimit);
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index b21c77db1d01..7ed77406c777 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -196,6 +196,12 @@ public FileStoreScan withLevelFilter(Filter levelFilter) {
return this;
}
+ @Override
+ public FileStoreScan withLevelMinMaxFilter(BiFilter minMaxFilter) {
+ manifestsReader.withLevelMinMaxFilter(minMaxFilter);
+ return this;
+ }
+
@Override
public FileStoreScan enableValueFilter() {
return this;
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
index 7b77fb179f6d..ddf2addf5313 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
@@ -28,9 +28,9 @@
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.index.DynamicBucketIndexMaintainer;
import org.apache.paimon.index.IndexFileHandler;
-import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.io.IndexIncrement;
+import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.memory.MemoryPoolFactory;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.operation.metrics.CompactionMetrics;
@@ -213,22 +213,26 @@ public List prepareCommit(boolean waitCompaction, long commitIden
WriterContainer writerContainer = entry.getValue();
CommitIncrement increment = writerContainer.writer.prepareCommit(waitCompaction);
- List newIndexFiles = new ArrayList<>();
+ DataIncrement newFilesIncrement = increment.newFilesIncrement();
+ CompactIncrement compactIncrement = increment.compactIncrement();
if (writerContainer.dynamicBucketMaintainer != null) {
- newIndexFiles.addAll(writerContainer.dynamicBucketMaintainer.prepareCommit());
+ newFilesIncrement
+ .newIndexFiles()
+ .addAll(writerContainer.dynamicBucketMaintainer.prepareCommit());
}
CompactDeletionFile compactDeletionFile = increment.compactDeletionFile();
if (compactDeletionFile != null) {
- compactDeletionFile.getOrCompute().ifPresent(newIndexFiles::add);
+ compactDeletionFile
+ .getOrCompute()
+ .ifPresent(compactIncrement.newIndexFiles()::add);
}
CommitMessageImpl committable =
new CommitMessageImpl(
partition,
bucket,
writerContainer.totalBuckets,
- increment.newFilesIncrement(),
- increment.compactIncrement(),
- new IndexIncrement(newIndexFiles));
+ newFilesIncrement,
+ compactIncrement);
result.add(committable);
if (committable.isEmpty()) {
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
new file mode 100644
index 000000000000..fad67e318320
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation;
+
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ManifestFile;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.utils.SnapshotManager;
+
+/** {@link FileStoreScan} for data-evolution enabled table. */
+public class DataEvolutionFileStoreScan extends AppendOnlyFileStoreScan {
+
+ public DataEvolutionFileStoreScan(
+ ManifestsReader manifestsReader,
+ BucketSelectConverter bucketSelectConverter,
+ SnapshotManager snapshotManager,
+ SchemaManager schemaManager,
+ TableSchema schema,
+ ManifestFile.Factory manifestFileFactory,
+ Integer scanManifestParallelism) {
+ super(
+ manifestsReader,
+ bucketSelectConverter,
+ snapshotManager,
+ schemaManager,
+ schema,
+ manifestFileFactory,
+ scanManifestParallelism,
+ false);
+ }
+
+ public DataEvolutionFileStoreScan withFilter(Predicate predicate) {
+ return this;
+ }
+
+ /** Note: Keep this thread-safe. */
+ @Override
+ protected boolean filterByStats(ManifestEntry entry) {
+ return true;
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
index c2a6ebd18fe3..f4c426e86f12 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
@@ -37,6 +37,7 @@
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.SpecialFields;
import org.apache.paimon.table.source.DataEvolutionSplitGenerator;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.types.DataField;
@@ -56,7 +57,7 @@
import java.util.stream.Collectors;
import static java.lang.String.format;
-import static org.apache.paimon.table.SpecialFields.rowTypeWithRowLineage;
+import static org.apache.paimon.table.SpecialFields.rowTypeWithRowTracking;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/**
@@ -124,7 +125,7 @@ public RecordReader createReader(DataSplit split) throws IOExceptio
new Builder(
formatDiscover,
readRowType.getFields(),
- schema -> rowTypeWithRowLineage(schema.logicalRowType(), true).getFields(),
+ schema -> rowTypeWithRowTracking(schema.logicalRowType(), true).getFields(),
null,
null,
null);
@@ -189,7 +190,8 @@ private DataEvolutionFileReader createUnionReader(
long schemaId = file.schemaId();
TableSchema dataSchema = schemaManager.schema(schemaId).project(file.writeCols());
int[] fieldIds =
- rowTypeWithRowLineage(dataSchema.logicalRowType()).getFields().stream()
+ SpecialFields.rowTypeWithRowTracking(dataSchema.logicalRowType()).getFields()
+ .stream()
.mapToInt(DataField::id)
.toArray();
List readFields = new ArrayList<>();
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
index dae3d5a0f29c..98a761b47166 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
@@ -35,7 +35,7 @@
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.stats.StatsFileHandler;
import org.apache.paimon.utils.DataFilePathFactories;
-import org.apache.paimon.utils.FileDeletionThreadPool;
+import org.apache.paimon.utils.FileOperationThreadPool;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.SnapshotManager;
@@ -102,7 +102,7 @@ public FileDeletionBase(
this.statsFileHandler = statsFileHandler;
this.cleanEmptyDirectories = cleanEmptyDirectories;
this.deletionBuckets = new HashMap<>();
- this.deleteFileExecutor = FileDeletionThreadPool.getExecutorService(deleteFileThreadNum);
+ this.deleteFileExecutor = FileOperationThreadPool.getExecutorService(deleteFileThreadNum);
}
/**
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
index 4156ce0a834c..6a00db6f0ee1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
@@ -50,7 +50,7 @@ public interface FileStoreCommit extends AutoCloseable {
* note that this partition does not necessarily equal to the partitions of the newly added
* key-values. This is just the partition to be cleaned up.
*/
- int overwrite(
+ int overwritePartition(
Map partition,
ManifestCommittable committable,
Map properties);
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 56fde4e30e43..7df158978188 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -20,11 +20,13 @@
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
+import org.apache.paimon.Snapshot.CommitKind;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.SnapshotCommit;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.manifest.FileEntry;
@@ -60,7 +62,6 @@
import org.apache.paimon.utils.IOUtils;
import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.Pair;
-import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SnapshotManager;
import org.slf4j.Logger;
@@ -69,7 +70,6 @@
import javax.annotation.Nullable;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
@@ -87,14 +87,16 @@
import static java.util.Collections.emptyList;
import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
-import static org.apache.paimon.index.HashIndexFile.HASH_INDEX;
import static org.apache.paimon.manifest.ManifestEntry.recordCount;
import static org.apache.paimon.manifest.ManifestEntry.recordCountAdd;
import static org.apache.paimon.manifest.ManifestEntry.recordCountDelete;
import static org.apache.paimon.partition.PartitionPredicate.createBinaryPartitions;
import static org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
+import static org.apache.paimon.utils.ConflictDeletionUtils.buildBaseEntriesWithDV;
+import static org.apache.paimon.utils.ConflictDeletionUtils.buildDeltaEntriesWithDV;
import static org.apache.paimon.utils.InternalRowPartitionComputer.partToSimpleString;
import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.utils.Preconditions.checkState;
/**
* Default implementation of {@link FileStoreCommit}.
@@ -151,6 +153,9 @@ public class FileStoreCommitImpl implements FileStoreCommit {
@Nullable private Long strictModeLastSafeSnapshot;
private final InternalRowPartitionComputer partitionComputer;
private final boolean rowTrackingEnabled;
+ private final boolean isPkTable;
+ private final boolean deletionVectorsEnabled;
+ private final IndexFileHandler indexFileHandler;
private boolean ignoreEmptyCommit;
private CommitMetrics commitMetrics;
@@ -187,7 +192,10 @@ public FileStoreCommitImpl(
long commitMinRetryWait,
long commitMaxRetryWait,
@Nullable Long strictModeLastSafeSnapshot,
- boolean rowTrackingEnabled) {
+ boolean rowTrackingEnabled,
+ boolean isPkTable,
+ boolean deletionVectorsEnabled,
+ IndexFileHandler indexFileHandler) {
this.snapshotCommit = snapshotCommit;
this.fileIO = fileIO;
this.schemaManager = schemaManager;
@@ -231,6 +239,9 @@ public FileStoreCommitImpl(
this.statsFileHandler = statsFileHandler;
this.bucketMode = bucketMode;
this.rowTrackingEnabled = rowTrackingEnabled;
+ this.isPkTable = isPkTable;
+ this.deletionVectorsEnabled = deletionVectorsEnabled;
+ this.indexFileHandler = indexFileHandler;
}
@Override
@@ -295,24 +306,24 @@ public int commit(ManifestCommittable committable, boolean checkAppendFiles) {
List appendTableFiles = new ArrayList<>();
List appendChangelog = new ArrayList<>();
+ List appendIndexFiles = new ArrayList<>();
List compactTableFiles = new ArrayList<>();
List compactChangelog = new ArrayList<>();
- List appendHashIndexFiles = new ArrayList<>();
- List compactDvIndexFiles = new ArrayList<>();
+ List compactIndexFiles = new ArrayList<>();
collectChanges(
committable.fileCommittables(),
appendTableFiles,
appendChangelog,
+ appendIndexFiles,
compactTableFiles,
compactChangelog,
- appendHashIndexFiles,
- compactDvIndexFiles);
+ compactIndexFiles);
try {
List appendSimpleEntries = SimpleFileEntry.from(appendTableFiles);
if (!ignoreEmptyCommit
|| !appendTableFiles.isEmpty()
|| !appendChangelog.isEmpty()
- || !appendHashIndexFiles.isEmpty()) {
+ || !appendIndexFiles.isEmpty()) {
// Optimization for common path.
// Step 1:
// Read manifest entries from changed partitions here and check for conflicts.
@@ -321,17 +332,29 @@ public int commit(ManifestCommittable committable, boolean checkAppendFiles) {
// This optimization is mainly used to decrease the number of times we read from
// files.
latestSnapshot = snapshotManager.latestSnapshot();
+ CommitKind commitKind = CommitKind.APPEND;
+ ConflictCheck conflictCheck = noConflictCheck();
+ if (containsFileDeletionOrDeletionVectors(appendSimpleEntries, appendIndexFiles)) {
+ commitKind = CommitKind.OVERWRITE;
+ conflictCheck = mustConflictCheck();
+ }
+
if (latestSnapshot != null && checkAppendFiles) {
// it is possible that some partitions only have compact changes,
// so we need to contain all changes
baseEntries.addAll(
readAllEntriesFromChangedPartitions(
- latestSnapshot, appendTableFiles, compactTableFiles));
+ latestSnapshot,
+ changedPartitions(
+ appendTableFiles,
+ compactTableFiles,
+ appendIndexFiles)));
noConflictsOrFail(
- latestSnapshot.commitUser(),
+ latestSnapshot,
baseEntries,
appendSimpleEntries,
- Snapshot.CommitKind.APPEND);
+ appendIndexFiles,
+ commitKind);
safeLatestSnapshotId = latestSnapshot.id();
}
@@ -339,20 +362,20 @@ public int commit(ManifestCommittable committable, boolean checkAppendFiles) {
tryCommit(
appendTableFiles,
appendChangelog,
- appendHashIndexFiles,
+ appendIndexFiles,
committable.identifier(),
committable.watermark(),
committable.logOffsets(),
committable.properties(),
- Snapshot.CommitKind.APPEND,
- noConflictCheck(),
+ commitKind,
+ conflictCheck,
null);
generatedSnapshot += 1;
}
if (!compactTableFiles.isEmpty()
|| !compactChangelog.isEmpty()
- || !compactDvIndexFiles.isEmpty()) {
+ || !compactIndexFiles.isEmpty()) {
// Optimization for common path.
// Step 2:
// Add appendChanges to the manifest entries read above and check for conflicts.
@@ -363,10 +386,11 @@ public int commit(ManifestCommittable committable, boolean checkAppendFiles) {
if (safeLatestSnapshotId != null) {
baseEntries.addAll(appendSimpleEntries);
noConflictsOrFail(
- latestSnapshot.commitUser(),
+ latestSnapshot,
baseEntries,
SimpleFileEntry.from(compactTableFiles),
- Snapshot.CommitKind.COMPACT);
+ compactIndexFiles,
+ CommitKind.COMPACT);
// assume this compact commit follows just after the append commit created above
safeLatestSnapshotId += 1;
}
@@ -375,12 +399,12 @@ public int commit(ManifestCommittable committable, boolean checkAppendFiles) {
tryCommit(
compactTableFiles,
compactChangelog,
- compactDvIndexFiles,
+ compactIndexFiles,
committable.identifier(),
committable.watermark(),
committable.logOffsets(),
committable.properties(),
- Snapshot.CommitKind.COMPACT,
+ CommitKind.COMPACT,
hasConflictChecked(safeLatestSnapshotId),
null);
generatedSnapshot += 1;
@@ -425,8 +449,23 @@ private void reportCommit(
commitMetrics.reportCommit(commitStats);
}
+ private boolean containsFileDeletionOrDeletionVectors(
+ List appendSimpleEntries, List appendIndexFiles) {
+ for (SimpleFileEntry appendSimpleEntry : appendSimpleEntries) {
+ if (appendSimpleEntry.kind().equals(FileKind.DELETE)) {
+ return true;
+ }
+ }
+ for (IndexManifestEntry appendIndexFile : appendIndexFiles) {
+ if (appendIndexFile.indexFile().indexType().equals(DELETION_VECTORS_INDEX)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
@Override
- public int overwrite(
+ public int overwritePartition(
Map partition,
ManifestCommittable committable,
Map properties) {
@@ -447,18 +486,18 @@ public int overwrite(
int attempts = 0;
List appendTableFiles = new ArrayList<>();
List appendChangelog = new ArrayList<>();
+ List appendIndexFiles = new ArrayList<>();
List compactTableFiles = new ArrayList<>();
List compactChangelog = new ArrayList<>();
- List appendHashIndexFiles = new ArrayList<>();
- List compactDvIndexFiles = new ArrayList<>();
+ List compactIndexFiles = new ArrayList<>();
collectChanges(
committable.fileCommittables(),
appendTableFiles,
appendChangelog,
+ appendIndexFiles,
compactTableFiles,
compactChangelog,
- appendHashIndexFiles,
- compactDvIndexFiles);
+ compactIndexFiles);
if (!appendChangelog.isEmpty() || !compactChangelog.isEmpty()) {
StringBuilder warnMessage =
@@ -492,7 +531,7 @@ public int overwrite(
partitionFilter = PartitionPredicate.fromMultiple(partitionType, partitions);
}
} else {
- // partition may be partial partition fields, so here must to use predicate way.
+ // partition may be partial partition fields, so here must use predicate way.
Predicate partitionPredicate =
createPartitionPredicate(partition, partitionType, partitionDefaultName);
partitionFilter =
@@ -515,10 +554,10 @@ public int overwrite(
// overwrite new files
if (!skipOverwrite) {
attempts +=
- tryOverwrite(
+ tryOverwritePartition(
partitionFilter,
appendTableFiles,
- appendHashIndexFiles,
+ appendIndexFiles,
committable.identifier(),
committable.watermark(),
committable.logOffsets(),
@@ -526,17 +565,17 @@ public int overwrite(
generatedSnapshot += 1;
}
- if (!compactTableFiles.isEmpty() || !compactDvIndexFiles.isEmpty()) {
+ if (!compactTableFiles.isEmpty() || !compactIndexFiles.isEmpty()) {
attempts +=
tryCommit(
compactTableFiles,
emptyList(),
- compactDvIndexFiles,
+ compactIndexFiles,
committable.identifier(),
committable.watermark(),
committable.logOffsets(),
committable.properties(),
- Snapshot.CommitKind.COMPACT,
+ CommitKind.COMPACT,
mustConflictCheck(),
null);
generatedSnapshot += 1;
@@ -589,7 +628,7 @@ public void dropPartitions(List