diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index f4e0a6d1cd43..6409918761fe 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -146,6 +146,12 @@ String Fields that are ignored for comparison while generating -U, +U changelog for the same record. This configuration is only valid for the changelog-producer.row-deduplicate is true. + +
changelog-read.sequence-number.enabled
+ false + Boolean + Whether to include _SEQUENCE_NUMBER field in audit_log and binlog system tables. This is only valid for primary key tables. +
changelog.num-retained.max
(none) diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index 78270d7523bc..7917d7b0eb25 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -827,6 +827,14 @@ public InlineElement getDescription() { .withDescription( "Fields that are ignored for comparison while generating -U, +U changelog for the same record. This configuration is only valid for the changelog-producer.row-deduplicate is true."); + public static final ConfigOption CHANGELOG_READ_SEQUENCE_NUMBER_ENABLED = + key("changelog-read.sequence-number.enabled") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to include _SEQUENCE_NUMBER field in audit_log and binlog system tables. " + + "This is only valid for primary key tables."); + public static final ConfigOption SEQUENCE_FIELD = key("sequence.field") .stringType() @@ -2721,6 +2729,10 @@ public List changelogRowDeduplicateIgnoreFields() { .orElse(Collections.emptyList()); } + public boolean changelogReadSequenceNumberEnabled() { + return options.get(CHANGELOG_READ_SEQUENCE_NUMBER_ENABLED); + } + public boolean scanPlanSortPartition() { return options.get(SCAN_PLAN_SORT_PARTITION); } diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java index abbc2e4912c3..3711900ac5e0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java @@ -245,6 +245,8 @@ public static void validateTableSchema(TableSchema schema) { validateIncrementalClustering(schema, options); validateChainTable(schema, options); + + validateChangelogReadSequenceNumber(schema, options); } public static void validateFallbackBranch(SchemaManager schemaManager, TableSchema schema) { @@ -713,4 +715,15 @@ public static void validateChainTable(TableSchema schema, CoreOptions options) { "Partition timestamp formatter is required for chain table."); } } + + private static void validateChangelogReadSequenceNumber( + TableSchema schema, CoreOptions options) { + if (options.changelogReadSequenceNumberEnabled()) { + checkArgument( + !schema.primaryKeys().isEmpty(), + "Cannot enable '%s' for non-primary-key table. " + + "Sequence number is only available for primary key tables.", + CoreOptions.CHANGELOG_READ_SEQUENCE_NUMBER_ENABLED.key()); + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java index fde63285a42d..da694671433f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java @@ -43,6 +43,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.function.Supplier; /** @@ -161,14 +162,17 @@ public RecordReader reader(Split split) throws IOException { throw new RuntimeException("Should not happen."); } - public static RecordReader unwrap(RecordReader reader) { + public static RecordReader unwrap( + RecordReader reader, Map schemaOptions) { return new RecordReader() { @Nullable @Override public RecordIterator readBatch() throws IOException { RecordIterator batch = reader.readBatch(); - return batch == null ? null : new ValueContentRowDataRecordIterator(batch); + return batch == null + ? null + : new ValueContentRowDataRecordIterator(batch, schemaOptions); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/ValueContentRowDataRecordIterator.java b/paimon-core/src/main/java/org/apache/paimon/table/source/ValueContentRowDataRecordIterator.java index adcf504a5d38..38dcbb04848a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/ValueContentRowDataRecordIterator.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/ValueContentRowDataRecordIterator.java @@ -18,17 +18,31 @@ package org.apache.paimon.table.source; +import org.apache.paimon.CoreOptions; import org.apache.paimon.KeyValue; +import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.JoinedRow; import org.apache.paimon.reader.RecordReader; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; /** A {@link RecordReader.RecordIterator} mapping a {@link KeyValue} to its value. */ public class ValueContentRowDataRecordIterator extends ResetRowKindRecordIterator { + private final boolean includeSequenceNumber; + public ValueContentRowDataRecordIterator(RecordReader.RecordIterator kvIterator) { + this(kvIterator, new HashMap<>(1)); + } + + public ValueContentRowDataRecordIterator( + RecordReader.RecordIterator kvIterator, Map schemaOptions) { super(kvIterator); + this.includeSequenceNumber = + CoreOptions.fromMap(schemaOptions).changelogReadSequenceNumberEnabled(); } @Override @@ -40,6 +54,15 @@ public InternalRow next() throws IOException { InternalRow rowData = kv.value(); rowData.setRowKind(kv.valueKind()); + + if (includeSequenceNumber) { + JoinedRow joinedRow = new JoinedRow(); + GenericRow systemFieldsRow = new GenericRow(1); + systemFieldsRow.setField(0, kv.sequenceNumber()); + joinedRow.replace(systemFieldsRow, rowData); + joinedRow.setRowKind(kv.valueKind()); + return joinedRow; + } return rowData; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalChangelogReadProvider.java b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalChangelogReadProvider.java index ee3d84b9544f..abe2f1ced20d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalChangelogReadProvider.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalChangelogReadProvider.java @@ -75,7 +75,7 @@ private SplitRead create(Supplier supplier) { dataSplit.dataFiles(), dataSplit.deletionFiles().orElse(null), false)); - return unwrap(reader); + return unwrap(reader, read.tableSchema().options()); }; return SplitRead.convert(read, convertedFactory); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java index ec4408fb9737..7655bd3b35f6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java @@ -117,7 +117,7 @@ public RecordReader createReader(Split s) throws IOException { ProjectedRow.from(readType, mergeRead.tableSchema().logicalRowType()); reader = reader.transform(kv -> kv.replaceValue(projectedRow.replaceRow(kv.value()))); } - return KeyValueTableRead.unwrap(reader); + return KeyValueTableRead.unwrap(reader, mergeRead.tableSchema().options()); } private static RecordReader readDiff( diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/MergeFileSplitReadProvider.java b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/MergeFileSplitReadProvider.java index d69209ef4e66..da9692a8d4a0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/MergeFileSplitReadProvider.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/MergeFileSplitReadProvider.java @@ -49,7 +49,8 @@ public MergeFileSplitReadProvider( private SplitRead create(Supplier supplier) { final MergeFileSplitRead read = supplier.get().withReadKeyType(RowType.of()); - return SplitRead.convert(read, split -> unwrap(read.createReader(split))); + return SplitRead.convert( + read, split -> unwrap(read.createReader(split), read.tableSchema().options())); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java index d590e1eb5536..b6bf179c0916 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java @@ -88,24 +88,34 @@ public class AuditLogTable implements DataTable, ReadonlyTable { public static final String AUDIT_LOG = "audit_log"; - public static final PredicateReplaceVisitor PREDICATE_CONVERTER = - p -> { - if (p.index() == 0) { - return Optional.empty(); - } - return Optional.of( - new LeafPredicate( - p.function(), - p.type(), - p.index() - 1, - p.fieldName(), - p.literals())); - }; - private final FileStoreTable wrapped; + /** Number of special fields (rowkind, and optionally _SEQUENCE_NUMBER). */ + protected final int specialFieldCount; + public AuditLogTable(FileStoreTable wrapped) { this.wrapped = wrapped; + this.specialFieldCount = + coreOptions().changelogReadSequenceNumberEnabled() + && !wrapped.primaryKeys().isEmpty() + ? 2 + : 1; + } + + /** Creates a PredicateReplaceVisitor that adjusts field indices by systemFieldCount. */ + private PredicateReplaceVisitor createPredicateConverter() { + return p -> { + if (p.index() < specialFieldCount) { + return Optional.empty(); + } + return Optional.of( + new LeafPredicate( + p.function(), + p.type(), + p.index() - specialFieldCount, + p.fieldName(), + p.literals())); + }; } @Override @@ -142,6 +152,9 @@ public String name() { public RowType rowType() { List fields = new ArrayList<>(); fields.add(SpecialFields.ROW_KIND); + if (specialFieldCount > 1) { + fields.add(SpecialFields.SEQUENCE_NUMBER); + } fields.addAll(wrapped.rowType().getFields()); return new RowType(fields); } @@ -238,9 +251,10 @@ public FileIO fileIO() { /** Push down predicate to dataScan and dataRead. */ private Optional convert(Predicate predicate) { + PredicateReplaceVisitor converter = createPredicateConverter(); List result = PredicateBuilder.splitAnd(predicate).stream() - .map(p -> p.visit(PREDICATE_CONVERTER)) + .map(p -> p.visit(converter)) .filter(Optional::isPresent) .map(Optional::get) .collect(Collectors.toList()); @@ -630,6 +644,11 @@ public DataTableScan withShard(int indexOfThisSubtask, int numberOfParallelSubta class AuditLogRead implements InnerTableRead { + // Special index for rowkind field + protected static final int ROW_KIND_INDEX = -1; + // Special index for _SEQUENCE_NUMBER field + protected static final int SEQUENCE_NUMBER_INDEX = -2; + protected final InnerTableRead dataRead; protected int[] readProjection; @@ -639,52 +658,61 @@ protected AuditLogRead(InnerTableRead dataRead) { this.readProjection = defaultProjection(); } - /** Default projection, just add row kind to the first. */ + /** Default projection, add system fields (rowkind, and optionally _SEQUENCE_NUMBER). */ private int[] defaultProjection() { int dataFieldCount = wrapped.rowType().getFieldCount(); - int[] projection = new int[dataFieldCount + 1]; - projection[0] = -1; + int[] projection = new int[dataFieldCount + specialFieldCount]; + projection[0] = ROW_KIND_INDEX; + if (specialFieldCount > 1) { + projection[1] = SEQUENCE_NUMBER_INDEX; + } for (int i = 0; i < dataFieldCount; i++) { - projection[i + 1] = i; + projection[specialFieldCount + i] = i + specialFieldCount - 1; } return projection; } - @Override - public InnerTableRead withFilter(Predicate predicate) { - convert(predicate).ifPresent(dataRead::withFilter); - return this; - } - - @Override - public InnerTableRead withReadType(RowType readType) { - // data projection to push down to dataRead - List dataReadFields = new ArrayList<>(); - - // read projection to handle record returned by dataRead + /** Build projection array from readType. */ + private int[] buildProjection(RowType readType) { List fields = readType.getFields(); - int[] readProjection = new int[fields.size()]; + int[] projection = new int[fields.size()]; + int dataFieldIndex = 0; - boolean rowKindAppeared = false; for (int i = 0; i < fields.size(); i++) { String fieldName = fields.get(i).name(); if (fieldName.equals(SpecialFields.ROW_KIND.name())) { - rowKindAppeared = true; - readProjection[i] = -1; + projection[i] = ROW_KIND_INDEX; + } else if (fieldName.equals(SpecialFields.SEQUENCE_NUMBER.name())) { + projection[i] = SEQUENCE_NUMBER_INDEX; } else { - dataReadFields.add(fields.get(i)); - // There is no row kind field. Keep it as it is - // Row kind field has occurred, and the following fields are offset by 1 - // position - readProjection[i] = rowKindAppeared ? i - 1 : i; + projection[i] = dataFieldIndex + specialFieldCount - 1; + dataFieldIndex++; } } + return projection; + } + + @Override + public InnerTableRead withFilter(Predicate predicate) { + convert(predicate).ifPresent(dataRead::withFilter); + return this; + } - this.readProjection = readProjection; - dataRead.withReadType(new RowType(readType.isNullable(), dataReadFields)); + @Override + public InnerTableRead withReadType(RowType readType) { + this.readProjection = buildProjection(readType); + List dataFields = extractDataFields(readType); + dataRead.withReadType(new RowType(readType.isNullable(), dataFields)); return this; } + /** Extract data fields (non-system fields) from readType. */ + private List extractDataFields(RowType readType) { + return readType.getFields().stream() + .filter(f -> !SpecialFields.isSystemField(f.name())) + .collect(Collectors.toList()); + } + @Override public TableRead withIOManager(IOManager ioManager) { this.dataRead.withIOManager(ioManager); @@ -701,7 +729,10 @@ private InternalRow convertRow(InternalRow data) { } } - /** A {@link ProjectedRow} which returns row kind when mapping index is negative. */ + /** + * A {@link ProjectedRow} which returns row kind and sequence number when mapping index is + * negative. + */ static class AuditLogRow extends ProjectedRow { AuditLogRow(int[] indexMapping, InternalRow row) { @@ -723,15 +754,26 @@ public void setRowKind(RowKind kind) { @Override public boolean isNullAt(int pos) { if (indexMapping[pos] < 0) { - // row kind is always not null + // row kind and sequence num are always not null return false; } return super.isNullAt(pos); } + @Override + public long getLong(int pos) { + int index = indexMapping[pos]; + if (index == AuditLogRead.SEQUENCE_NUMBER_INDEX) { + // _SEQUENCE_NUMBER is at index 0 in bottom output + return row.getLong(0); + } + return super.getLong(pos); + } + @Override public BinaryString getString(int pos) { - if (indexMapping[pos] < 0) { + int index = indexMapping[pos]; + if (index == AuditLogRead.ROW_KIND_INDEX) { return BinaryString.fromString(row.getRowKind().shortString()); } return super.getString(pos); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java index cc8d1621a3fa..66ed8bc2f9b2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java @@ -72,6 +72,9 @@ public String name() { public RowType rowType() { List fields = new ArrayList<>(); fields.add(SpecialFields.ROW_KIND); + if (specialFieldCount > 1) { + fields.add(SpecialFields.SEQUENCE_NUMBER); + } for (DataField field : wrapped.rowType().getFields()) { // convert to nullable fields.add(field.newType(new ArrayType(field.type().nullable()))); @@ -102,7 +105,7 @@ public InnerTableRead withReadType(RowType readType) { List fields = new ArrayList<>(); List wrappedReadFields = new ArrayList<>(); for (DataField field : readType.getFields()) { - if (field.name().equals(SpecialFields.ROW_KIND.name())) { + if (SpecialFields.isSystemField(field.name())) { fields.add(field); } else { DataField origin = field.newType(((ArrayType) field.type()).getElementType()); @@ -117,12 +120,16 @@ public InnerTableRead withReadType(RowType readType) { @Override public RecordReader createReader(Split split) throws IOException { DataSplit dataSplit = (DataSplit) split; + // When sequence number is enabled, the underlying data layout is: + // [_SEQUENCE_NUMBER, pk, pt, col1, ...] + // We need to offset the field index to skip the sequence number field. + int offset = specialFieldCount - 1; InternalRow.FieldGetter[] fieldGetters = IntStream.range(0, wrappedReadType.getFieldCount()) .mapToObj( i -> InternalRow.createFieldGetter( - wrappedReadType.getTypeAt(i), i)) + wrappedReadType.getTypeAt(i), i + offset)) .toArray(InternalRow.FieldGetter[]::new); if (dataSplit.isStreaming()) { @@ -146,15 +153,23 @@ private InternalRow convertToArray( InternalRow row1, @Nullable InternalRow row2, InternalRow.FieldGetter[] fieldGetters) { - GenericRow row = new GenericRow(row1.getFieldCount()); - for (int i = 0; i < row1.getFieldCount(); i++) { + // seqOffset is 1 if sequence number is enabled, 0 otherwise + int seqOffset = specialFieldCount - 1; + GenericRow row = new GenericRow(fieldGetters.length + seqOffset); + + // Copy sequence number if enabled (it's at index 0 in input row) + if (seqOffset > 0) { + row.setField(0, row1.getLong(0)); + } + + for (int i = 0; i < fieldGetters.length; i++) { Object o1 = fieldGetters[i].getFieldOrNull(row1); Object o2; if (row2 != null) { o2 = fieldGetters[i].getFieldOrNull(row2); - row.setField(i, new GenericArray(new Object[] {o1, o2})); + row.setField(i + seqOffset, new GenericArray(new Object[] {o1, o2})); } else { - row.setField(i, new GenericArray(new Object[] {o1})); + row.setField(i + seqOffset, new GenericArray(new Object[] {o1})); } } // If no row2 provided, then follow the row1 kind. diff --git a/paimon-core/src/test/java/org/apache/paimon/table/system/AuditLogTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/system/AuditLogTableTest.java index 41facd1d837f..36031884ce71 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/system/AuditLogTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/system/AuditLogTableTest.java @@ -27,6 +27,7 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.schema.Schema; +import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.SchemaUtils; import org.apache.paimon.schema.TableSchema; @@ -36,7 +37,6 @@ import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowKind; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.util.ArrayList; @@ -48,15 +48,62 @@ /** Unit tests for {@link AuditLogTable}. */ public class AuditLogTableTest extends TableTestBase { - private static final String tableName = "MyTable"; - private AuditLogTable auditLogTable; + @Test + public void testReadAuditLogFromLatest() throws Exception { + AuditLogTable auditLogTable = createAuditLogTable("audit_table", false); + assertThat(auditLogTable.rowType().getFieldNames()) + .containsExactly("rowkind", "pk", "pt", "col1"); + List expectRow = getExpectedResult(); + List result = read(auditLogTable); + assertThat(result).containsExactlyInAnyOrderElementsOf(expectRow); + } + + @Test + public void testReadSequenceNumberWithTableOption() throws Exception { + AuditLogTable auditLogTable = createAuditLogTable("audit_table_with_seq", true); + assertThat(auditLogTable.rowType().getFieldNames()) + .containsExactly("rowkind", "_SEQUENCE_NUMBER", "pk", "pt", "col1"); + + List result = read(auditLogTable); + List expectRow = getExpectedResultWithSequenceNumber(); + assertThat(result).containsExactlyInAnyOrderElementsOf(expectRow); + } + + @Test + public void testReadSequenceNumberWithAlterTable() throws Exception { + String tableName = "audit_table_alter_seq"; + // Create table without sequence-number option + AuditLogTable auditLogTable = createAuditLogTable(tableName, false); + assertThat(auditLogTable.rowType().getFieldNames()) + .containsExactly("rowkind", "pk", "pt", "col1"); + + // Add sequence-number option via alterTable + catalog.alterTable( + identifier(tableName), + SchemaChange.setOption( + CoreOptions.CHANGELOG_READ_SEQUENCE_NUMBER_ENABLED.key(), "true"), + false); - @BeforeEach - public void before() throws Exception { + // Re-fetch the audit_log table to get updated schema + Identifier auditLogTableId = + identifier(tableName + SYSTEM_TABLE_SPLITTER + AuditLogTable.AUDIT_LOG); + AuditLogTable updatedAuditLogTable = (AuditLogTable) catalog.getTable(auditLogTableId); + + // Verify schema now includes _SEQUENCE_NUMBER + assertThat(updatedAuditLogTable.rowType().getFieldNames()) + .containsExactly("rowkind", "_SEQUENCE_NUMBER", "pk", "pt", "col1"); + + List result = read(updatedAuditLogTable); + List expectRow = getExpectedResultWithSequenceNumber(); + assertThat(result).containsExactlyInAnyOrderElementsOf(expectRow); + } + + private AuditLogTable createAuditLogTable(String tableName, boolean enableSequenceNumber) + throws Exception { Path tablePath = new Path(String.format("%s/%s.db/%s", warehouse, database, tableName)); FileIO fileIO = LocalFileIO.create(); - Schema schema = + Schema.Builder schemaBuilder = Schema.newBuilder() .column("pk", DataTypes.INT()) .column("pt", DataTypes.INT()) @@ -64,44 +111,54 @@ public void before() throws Exception { .partitionKeys("pt") .primaryKey("pk", "pt") .option(CoreOptions.CHANGELOG_PRODUCER.key(), "input") - .option("bucket", "1") - .build(); + .option("bucket", "1"); + if (enableSequenceNumber) { + schemaBuilder.option(CoreOptions.CHANGELOG_READ_SEQUENCE_NUMBER_ENABLED.key(), "true"); + } TableSchema tableSchema = - SchemaUtils.forceCommit(new SchemaManager(fileIO, tablePath), schema); + SchemaUtils.forceCommit( + new SchemaManager(fileIO, tablePath), schemaBuilder.build()); FileStoreTable table = FileStoreTableFactory.create(LocalFileIO.create(), tablePath, tableSchema); - Identifier filesTableId = + + writeTestData(table); + + Identifier auditLogTableId = identifier(tableName + SYSTEM_TABLE_SPLITTER + AuditLogTable.AUDIT_LOG); - auditLogTable = (AuditLogTable) catalog.getTable(filesTableId); + return (AuditLogTable) catalog.getTable(auditLogTableId); + } + private void writeTestData(FileStoreTable table) throws Exception { write(table, GenericRow.ofKind(RowKind.INSERT, 1, 1, 1)); write(table, GenericRow.ofKind(RowKind.DELETE, 1, 1, 1)); write(table, GenericRow.ofKind(RowKind.INSERT, 1, 2, 5)); write(table, GenericRow.ofKind(RowKind.UPDATE_BEFORE, 1, 2, 5)); - write(table, GenericRow.ofKind(RowKind.UPDATE_AFTER, 1, 4, 6)); + write(table, GenericRow.ofKind(RowKind.UPDATE_AFTER, 1, 2, 6)); write(table, GenericRow.ofKind(RowKind.INSERT, 2, 3, 1)); } - @Test - public void testReadAuditLogFromLatest() throws Exception { - List expectRow = getExpectedResult(); - List result = read(auditLogTable); - assertThat(result).containsExactlyInAnyOrderElementsOf(expectRow); - } - private List getExpectedResult() { List expectedRow = new ArrayList<>(); expectedRow.add( GenericRow.of(BinaryString.fromString(RowKind.DELETE.shortString()), 1, 1, 1)); expectedRow.add( GenericRow.of( - BinaryString.fromString(RowKind.UPDATE_BEFORE.shortString()), 1, 2, 5)); + BinaryString.fromString(RowKind.UPDATE_AFTER.shortString()), 1, 2, 6)); + expectedRow.add( + GenericRow.of(BinaryString.fromString(RowKind.INSERT.shortString()), 2, 3, 1)); + return expectedRow; + } + + private List getExpectedResultWithSequenceNumber() { + List expectedRow = new ArrayList<>(); + expectedRow.add( + GenericRow.of(BinaryString.fromString(RowKind.DELETE.shortString()), 1L, 1, 1, 1)); expectedRow.add( GenericRow.of( - BinaryString.fromString(RowKind.UPDATE_AFTER.shortString()), 1, 4, 6)); + BinaryString.fromString(RowKind.UPDATE_AFTER.shortString()), 2L, 1, 2, 6)); expectedRow.add( - GenericRow.of(BinaryString.fromString(RowKind.INSERT.shortString()), 2, 3, 1)); + GenericRow.of(BinaryString.fromString(RowKind.INSERT.shortString()), 0L, 2, 3, 1)); return expectedRow; } } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/system/BinlogTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/system/BinlogTableTest.java new file mode 100644 index 000000000000..e6feb8e8f1b1 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/table/system/BinlogTableTest.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table.system; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericArray; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.schema.SchemaChange; +import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.schema.SchemaUtils; +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.FileStoreTableFactory; +import org.apache.paimon.table.TableTestBase; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowKind; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER; +import static org.assertj.core.api.Assertions.assertThat; + +/** Unit tests for {@link BinlogTable}. */ +public class BinlogTableTest extends TableTestBase { + + @Test + public void testReadBinlogFromLatest() throws Exception { + BinlogTable binlogTable = createBinlogTable("binlog_table", false); + assertThat(binlogTable.rowType().getFieldNames()) + .containsExactly("rowkind", "pk", "pt", "col1"); + + List result = read(binlogTable); + List expectRow = getExpectedResult(); + assertThat(result).containsExactlyInAnyOrderElementsOf(expectRow); + } + + @Test + public void testReadSequenceNumberWithTableOption() throws Exception { + BinlogTable binlogTable = createBinlogTable("binlog_table_with_seq", true); + assertThat(binlogTable.rowType().getFieldNames()) + .containsExactly("rowkind", "_SEQUENCE_NUMBER", "pk", "pt", "col1"); + + List result = read(binlogTable); + List expectRow = getExpectedResultWithSequenceNumber(); + assertThat(result).containsExactlyInAnyOrderElementsOf(expectRow); + } + + @Test + public void testReadSequenceNumberWithAlterTable() throws Exception { + String tableName = "binlog_table_alter_seq"; + // Create table without sequence-number option + BinlogTable binlogTable = createBinlogTable(tableName, false); + assertThat(binlogTable.rowType().getFieldNames()) + .containsExactly("rowkind", "pk", "pt", "col1"); + + // Add sequence-number option via alterTable + catalog.alterTable( + identifier(tableName), + SchemaChange.setOption( + CoreOptions.CHANGELOG_READ_SEQUENCE_NUMBER_ENABLED.key(), "true"), + false); + + // Re-fetch the binlog table to get updated schema + Identifier binlogTableId = + identifier(tableName + SYSTEM_TABLE_SPLITTER + BinlogTable.BINLOG); + BinlogTable updatedBinlogTable = (BinlogTable) catalog.getTable(binlogTableId); + + // Verify schema now includes _SEQUENCE_NUMBER + assertThat(updatedBinlogTable.rowType().getFieldNames()) + .containsExactly("rowkind", "_SEQUENCE_NUMBER", "pk", "pt", "col1"); + + List result = read(updatedBinlogTable); + List expectRow = getExpectedResultWithSequenceNumber(); + assertThat(result).containsExactlyInAnyOrderElementsOf(expectRow); + } + + private BinlogTable createBinlogTable(String tableName, boolean enableSequenceNumber) + throws Exception { + Path tablePath = new Path(String.format("%s/%s.db/%s", warehouse, database, tableName)); + FileIO fileIO = LocalFileIO.create(); + + Schema.Builder schemaBuilder = + Schema.newBuilder() + .column("pk", DataTypes.INT()) + .column("pt", DataTypes.INT()) + .column("col1", DataTypes.INT()) + .partitionKeys("pt") + .primaryKey("pk", "pt") + .option(CoreOptions.CHANGELOG_PRODUCER.key(), "input") + .option("bucket", "1"); + if (enableSequenceNumber) { + schemaBuilder.option(CoreOptions.CHANGELOG_READ_SEQUENCE_NUMBER_ENABLED.key(), "true"); + } + + TableSchema tableSchema = + SchemaUtils.forceCommit( + new SchemaManager(fileIO, tablePath), schemaBuilder.build()); + FileStoreTable table = + FileStoreTableFactory.create(LocalFileIO.create(), tablePath, tableSchema); + + writeTestData(table); + + Identifier binlogTableId = + identifier(tableName + SYSTEM_TABLE_SPLITTER + BinlogTable.BINLOG); + return (BinlogTable) catalog.getTable(binlogTableId); + } + + private void writeTestData(FileStoreTable table) throws Exception { + write(table, GenericRow.ofKind(RowKind.INSERT, 1, 1, 1)); + write(table, GenericRow.ofKind(RowKind.DELETE, 1, 1, 1)); + write(table, GenericRow.ofKind(RowKind.INSERT, 1, 2, 5)); + write(table, GenericRow.ofKind(RowKind.UPDATE_BEFORE, 1, 2, 5)); + write(table, GenericRow.ofKind(RowKind.UPDATE_AFTER, 1, 2, 6)); + write(table, GenericRow.ofKind(RowKind.INSERT, 2, 3, 1)); + } + + private List getExpectedResult() { + List expectedRow = new ArrayList<>(); + expectedRow.add( + GenericRow.of( + BinaryString.fromString(RowKind.DELETE.shortString()), + new GenericArray(new Object[] {1}), + new GenericArray(new Object[] {1}), + new GenericArray(new Object[] {1}))); + expectedRow.add( + GenericRow.of( + BinaryString.fromString(RowKind.UPDATE_AFTER.shortString()), + new GenericArray(new Object[] {1}), + new GenericArray(new Object[] {2}), + new GenericArray(new Object[] {6}))); + expectedRow.add( + GenericRow.of( + BinaryString.fromString(RowKind.INSERT.shortString()), + new GenericArray(new Object[] {2}), + new GenericArray(new Object[] {3}), + new GenericArray(new Object[] {1}))); + return expectedRow; + } + + private List getExpectedResultWithSequenceNumber() { + List expectedRow = new ArrayList<>(); + expectedRow.add( + GenericRow.of( + BinaryString.fromString(RowKind.DELETE.shortString()), + 1L, + new GenericArray(new Object[] {1}), + new GenericArray(new Object[] {1}), + new GenericArray(new Object[] {1}))); + expectedRow.add( + GenericRow.of( + BinaryString.fromString(RowKind.UPDATE_AFTER.shortString()), + 2L, + new GenericArray(new Object[] {1}), + new GenericArray(new Object[] {2}), + new GenericArray(new Object[] {6}))); + expectedRow.add( + GenericRow.of( + BinaryString.fromString(RowKind.INSERT.shortString()), + 0L, + new GenericArray(new Object[] {2}), + new GenericArray(new Object[] {3}), + new GenericArray(new Object[] {1}))); + return expectedRow; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java index f955418b6d21..c2a17412fe78 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java @@ -29,14 +29,13 @@ import org.apache.paimon.table.source.AbstractDataTableRead; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.InnerTableRead; +import org.apache.paimon.table.source.KeyValueTableRead; import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.TableRead; import org.apache.paimon.types.RowType; import java.io.IOException; -import static org.apache.paimon.table.source.KeyValueTableRead.unwrap; - /** An {@link InnerTableRead} that reads the data changed before and after compaction. */ public class LookupCompactDiffRead extends AbstractDataTableRead { private final SplitRead fullPhaseMergeRead; @@ -46,7 +45,11 @@ public LookupCompactDiffRead(MergeFileSplitRead mergeRead, TableSchema schema) { super(schema); this.incrementalDiffRead = new IncrementalCompactDiffSplitRead(mergeRead); this.fullPhaseMergeRead = - SplitRead.convert(mergeRead, split -> unwrap(mergeRead.createReader(split))); + SplitRead.convert( + mergeRead, + split -> + KeyValueTableRead.unwrap( + mergeRead.createReader(split), schema.options())); } @Override diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java index 22271c8a7ebe..bea70497071b 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java @@ -1053,6 +1053,104 @@ public void testBinlogTableWithProjection() { .containsExactly(Row.of("+I", new String[] {"A"})); } + @Test + public void testAuditLogTableWithSequenceNumberEnabled() { + // Create primary key table with changelog-read.sequence-number.enabled option + sql( + "CREATE TABLE test_table_seq (a int PRIMARY KEY NOT ENFORCED, b int, c AS a + b) " + + "WITH ('changelog-read.sequence-number.enabled'='true');"); + sql("INSERT INTO test_table_seq VALUES (1, 2)"); + sql("INSERT INTO test_table_seq VALUES (3, 4)"); + + // Test SELECT * includes _SEQUENCE_NUMBER + assertThat(sql("SELECT * FROM `test_table_seq$audit_log`")) + .containsExactlyInAnyOrder(Row.of("+I", 0L, 1, 2, 3), Row.of("+I", 1L, 3, 4, 7)); + + // Test out-of-order select with _SEQUENCE_NUMBER + assertThat(sql("SELECT b, c, _SEQUENCE_NUMBER FROM `test_table_seq$audit_log`")) + .containsExactlyInAnyOrder(Row.of(2, 3, 0L), Row.of(4, 7, 1L)); + + // Test selecting only _SEQUENCE_NUMBER, rowkind + assertThat(sql("SELECT _SEQUENCE_NUMBER, rowkind FROM `test_table_seq$audit_log`")) + .containsExactlyInAnyOrder(Row.of(0L, "+I"), Row.of(1L, "+I")); + } + + @Test + public void testAuditLogTableWithSequenceNumberAlterTable() { + // Create primary key table without sequence-number option + sql("CREATE TABLE test_table_dyn (a int PRIMARY KEY NOT ENFORCED, b int, c AS a + b);"); + sql("INSERT INTO test_table_dyn VALUES (1, 2)"); + sql("INSERT INTO test_table_dyn VALUES (3, 4)"); + + // Add changelog-read.sequence-number.enabled option via ALTER TABLE + sql("ALTER TABLE test_table_dyn SET ('changelog-read.sequence-number.enabled'='true')"); + + // Test SELECT * includes _SEQUENCE_NUMBER (same as + // testAuditLogTableWithSequenceNumberEnabled) + assertThat(sql("SELECT * FROM `test_table_dyn$audit_log`")) + .containsExactlyInAnyOrder(Row.of("+I", 0L, 1, 2, 3), Row.of("+I", 1L, 3, 4, 7)); + + // Test out-of-order select with _SEQUENCE_NUMBER + assertThat(sql("SELECT b, c, _SEQUENCE_NUMBER FROM `test_table_dyn$audit_log`")) + .containsExactlyInAnyOrder(Row.of(2, 3, 0L), Row.of(4, 7, 1L)); + + // Test selecting only _SEQUENCE_NUMBER, rowkind + assertThat(sql("SELECT _SEQUENCE_NUMBER, rowkind FROM `test_table_dyn$audit_log`")) + .containsExactlyInAnyOrder(Row.of(0L, "+I"), Row.of(1L, "+I")); + } + + @Test + public void testBinlogTableWithSequenceNumberEnabled() { + // Create primary key table with changelog-read.sequence-number.enabled option + sql( + "CREATE TABLE test_table_seq (a int PRIMARY KEY NOT ENFORCED, b int) " + + "WITH ('changelog-read.sequence-number.enabled'='true');"); + sql("INSERT INTO test_table_seq VALUES (1, 2)"); + sql("INSERT INTO test_table_seq VALUES (3, 4)"); + + // Test SELECT * includes _SEQUENCE_NUMBER + assertThat(sql("SELECT * FROM `test_table_seq$binlog`")) + .containsExactlyInAnyOrder( + Row.of("+I", 0L, new Integer[] {1}, new Integer[] {2}), + Row.of("+I", 1L, new Integer[] {3}, new Integer[] {4})); + + // Test out-of-order select with _SEQUENCE_NUMBER + assertThat(sql("SELECT b, _SEQUENCE_NUMBER FROM `test_table_seq$binlog`")) + .containsExactlyInAnyOrder( + Row.of(new Integer[] {2}, 0L), Row.of(new Integer[] {4}, 1L)); + + // Test selecting only _SEQUENCE_NUMBER + assertThat(sql("SELECT _SEQUENCE_NUMBER, rowkind FROM `test_table_seq$binlog`")) + .containsExactlyInAnyOrder(Row.of(0L, "+I"), Row.of(1L, "+I")); + } + + @Test + public void testBinlogTableWithSequenceNumberAlterTable() { + // Create primary key table without sequence-number option + sql("CREATE TABLE test_table_dyn (a int PRIMARY KEY NOT ENFORCED, b int);"); + sql("INSERT INTO test_table_dyn VALUES (1, 2)"); + sql("INSERT INTO test_table_dyn VALUES (3, 4)"); + + // Add changelog-read.sequence-number.enabled option via ALTER TABLE + sql("ALTER TABLE test_table_dyn SET ('changelog-read.sequence-number.enabled'='true')"); + + // Test SELECT * includes _SEQUENCE_NUMBER (same as + // testBinlogTableWithSequenceNumberEnabled) + assertThat(sql("SELECT * FROM `test_table_dyn$binlog`")) + .containsExactlyInAnyOrder( + Row.of("+I", 0L, new Integer[] {1}, new Integer[] {2}), + Row.of("+I", 1L, new Integer[] {3}, new Integer[] {4})); + + // Test out-of-order select with _SEQUENCE_NUMBER + assertThat(sql("SELECT b, _SEQUENCE_NUMBER FROM `test_table_dyn$binlog`")) + .containsExactlyInAnyOrder( + Row.of(new Integer[] {2}, 0L), Row.of(new Integer[] {4}, 1L)); + + // Test selecting only _SEQUENCE_NUMBER + assertThat(sql("SELECT _SEQUENCE_NUMBER, rowkind FROM `test_table_dyn$binlog`")) + .containsExactlyInAnyOrder(Row.of(0L, "+I"), Row.of(1L, "+I")); + } + @Test public void testBatchReadSourceWithSnapshot() { batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222), (3, 33, 333), (4, 44, 444)");