diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index f4e0a6d1cd43..6409918761fe 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -146,6 +146,12 @@
String |
Fields that are ignored for comparison while generating -U, +U changelog for the same record. This configuration is only valid for the changelog-producer.row-deduplicate is true. |
+
+ changelog-read.sequence-number.enabled |
+ false |
+ Boolean |
+ Whether to include _SEQUENCE_NUMBER field in audit_log and binlog system tables. This is only valid for primary key tables. |
+
changelog.num-retained.max |
(none) |
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 78270d7523bc..7917d7b0eb25 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -827,6 +827,14 @@ public InlineElement getDescription() {
.withDescription(
"Fields that are ignored for comparison while generating -U, +U changelog for the same record. This configuration is only valid for the changelog-producer.row-deduplicate is true.");
+ public static final ConfigOption CHANGELOG_READ_SEQUENCE_NUMBER_ENABLED =
+ key("changelog-read.sequence-number.enabled")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether to include _SEQUENCE_NUMBER field in audit_log and binlog system tables. "
+ + "This is only valid for primary key tables.");
+
public static final ConfigOption SEQUENCE_FIELD =
key("sequence.field")
.stringType()
@@ -2721,6 +2729,10 @@ public List changelogRowDeduplicateIgnoreFields() {
.orElse(Collections.emptyList());
}
+ public boolean changelogReadSequenceNumberEnabled() {
+ return options.get(CHANGELOG_READ_SEQUENCE_NUMBER_ENABLED);
+ }
+
public boolean scanPlanSortPartition() {
return options.get(SCAN_PLAN_SORT_PARTITION);
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index abbc2e4912c3..3711900ac5e0 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -245,6 +245,8 @@ public static void validateTableSchema(TableSchema schema) {
validateIncrementalClustering(schema, options);
validateChainTable(schema, options);
+
+ validateChangelogReadSequenceNumber(schema, options);
}
public static void validateFallbackBranch(SchemaManager schemaManager, TableSchema schema) {
@@ -713,4 +715,15 @@ public static void validateChainTable(TableSchema schema, CoreOptions options) {
"Partition timestamp formatter is required for chain table.");
}
}
+
+ private static void validateChangelogReadSequenceNumber(
+ TableSchema schema, CoreOptions options) {
+ if (options.changelogReadSequenceNumberEnabled()) {
+ checkArgument(
+ !schema.primaryKeys().isEmpty(),
+ "Cannot enable '%s' for non-primary-key table. "
+ + "Sequence number is only available for primary key tables.",
+ CoreOptions.CHANGELOG_READ_SEQUENCE_NUMBER_ENABLED.key());
+ }
+ }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
index fde63285a42d..da694671433f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
@@ -43,6 +43,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
import java.util.function.Supplier;
/**
@@ -161,14 +162,17 @@ public RecordReader reader(Split split) throws IOException {
throw new RuntimeException("Should not happen.");
}
- public static RecordReader unwrap(RecordReader reader) {
+ public static RecordReader unwrap(
+ RecordReader reader, Map schemaOptions) {
return new RecordReader() {
@Nullable
@Override
public RecordIterator readBatch() throws IOException {
RecordIterator batch = reader.readBatch();
- return batch == null ? null : new ValueContentRowDataRecordIterator(batch);
+ return batch == null
+ ? null
+ : new ValueContentRowDataRecordIterator(batch, schemaOptions);
}
@Override
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/ValueContentRowDataRecordIterator.java b/paimon-core/src/main/java/org/apache/paimon/table/source/ValueContentRowDataRecordIterator.java
index adcf504a5d38..38dcbb04848a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/ValueContentRowDataRecordIterator.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/ValueContentRowDataRecordIterator.java
@@ -18,17 +18,31 @@
package org.apache.paimon.table.source;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
+import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.JoinedRow;
import org.apache.paimon.reader.RecordReader;
import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
/** A {@link RecordReader.RecordIterator} mapping a {@link KeyValue} to its value. */
public class ValueContentRowDataRecordIterator extends ResetRowKindRecordIterator {
+ private final boolean includeSequenceNumber;
+
public ValueContentRowDataRecordIterator(RecordReader.RecordIterator kvIterator) {
+ this(kvIterator, new HashMap<>(1));
+ }
+
+ public ValueContentRowDataRecordIterator(
+ RecordReader.RecordIterator kvIterator, Map schemaOptions) {
super(kvIterator);
+ this.includeSequenceNumber =
+ CoreOptions.fromMap(schemaOptions).changelogReadSequenceNumberEnabled();
}
@Override
@@ -40,6 +54,15 @@ public InternalRow next() throws IOException {
InternalRow rowData = kv.value();
rowData.setRowKind(kv.valueKind());
+
+ if (includeSequenceNumber) {
+ JoinedRow joinedRow = new JoinedRow();
+ GenericRow systemFieldsRow = new GenericRow(1);
+ systemFieldsRow.setField(0, kv.sequenceNumber());
+ joinedRow.replace(systemFieldsRow, rowData);
+ joinedRow.setRowKind(kv.valueKind());
+ return joinedRow;
+ }
return rowData;
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalChangelogReadProvider.java b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalChangelogReadProvider.java
index ee3d84b9544f..abe2f1ced20d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalChangelogReadProvider.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalChangelogReadProvider.java
@@ -75,7 +75,7 @@ private SplitRead create(Supplier supplier) {
dataSplit.dataFiles(),
dataSplit.deletionFiles().orElse(null),
false));
- return unwrap(reader);
+ return unwrap(reader, read.tableSchema().options());
};
return SplitRead.convert(read, convertedFactory);
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java
index ec4408fb9737..7655bd3b35f6 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java
@@ -117,7 +117,7 @@ public RecordReader createReader(Split s) throws IOException {
ProjectedRow.from(readType, mergeRead.tableSchema().logicalRowType());
reader = reader.transform(kv -> kv.replaceValue(projectedRow.replaceRow(kv.value())));
}
- return KeyValueTableRead.unwrap(reader);
+ return KeyValueTableRead.unwrap(reader, mergeRead.tableSchema().options());
}
private static RecordReader readDiff(
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/MergeFileSplitReadProvider.java b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/MergeFileSplitReadProvider.java
index d69209ef4e66..da9692a8d4a0 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/MergeFileSplitReadProvider.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/MergeFileSplitReadProvider.java
@@ -49,7 +49,8 @@ public MergeFileSplitReadProvider(
private SplitRead create(Supplier supplier) {
final MergeFileSplitRead read = supplier.get().withReadKeyType(RowType.of());
- return SplitRead.convert(read, split -> unwrap(read.createReader(split)));
+ return SplitRead.convert(
+ read, split -> unwrap(read.createReader(split), read.tableSchema().options()));
}
@Override
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index d590e1eb5536..b6bf179c0916 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -88,24 +88,34 @@ public class AuditLogTable implements DataTable, ReadonlyTable {
public static final String AUDIT_LOG = "audit_log";
- public static final PredicateReplaceVisitor PREDICATE_CONVERTER =
- p -> {
- if (p.index() == 0) {
- return Optional.empty();
- }
- return Optional.of(
- new LeafPredicate(
- p.function(),
- p.type(),
- p.index() - 1,
- p.fieldName(),
- p.literals()));
- };
-
private final FileStoreTable wrapped;
+ /** Number of special fields (rowkind, and optionally _SEQUENCE_NUMBER). */
+ protected final int specialFieldCount;
+
public AuditLogTable(FileStoreTable wrapped) {
this.wrapped = wrapped;
+ this.specialFieldCount =
+ coreOptions().changelogReadSequenceNumberEnabled()
+ && !wrapped.primaryKeys().isEmpty()
+ ? 2
+ : 1;
+ }
+
+ /** Creates a PredicateReplaceVisitor that adjusts field indices by systemFieldCount. */
+ private PredicateReplaceVisitor createPredicateConverter() {
+ return p -> {
+ if (p.index() < specialFieldCount) {
+ return Optional.empty();
+ }
+ return Optional.of(
+ new LeafPredicate(
+ p.function(),
+ p.type(),
+ p.index() - specialFieldCount,
+ p.fieldName(),
+ p.literals()));
+ };
}
@Override
@@ -142,6 +152,9 @@ public String name() {
public RowType rowType() {
List fields = new ArrayList<>();
fields.add(SpecialFields.ROW_KIND);
+ if (specialFieldCount > 1) {
+ fields.add(SpecialFields.SEQUENCE_NUMBER);
+ }
fields.addAll(wrapped.rowType().getFields());
return new RowType(fields);
}
@@ -238,9 +251,10 @@ public FileIO fileIO() {
/** Push down predicate to dataScan and dataRead. */
private Optional convert(Predicate predicate) {
+ PredicateReplaceVisitor converter = createPredicateConverter();
List result =
PredicateBuilder.splitAnd(predicate).stream()
- .map(p -> p.visit(PREDICATE_CONVERTER))
+ .map(p -> p.visit(converter))
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toList());
@@ -630,6 +644,11 @@ public DataTableScan withShard(int indexOfThisSubtask, int numberOfParallelSubta
class AuditLogRead implements InnerTableRead {
+ // Special index for rowkind field
+ protected static final int ROW_KIND_INDEX = -1;
+ // Special index for _SEQUENCE_NUMBER field
+ protected static final int SEQUENCE_NUMBER_INDEX = -2;
+
protected final InnerTableRead dataRead;
protected int[] readProjection;
@@ -639,52 +658,61 @@ protected AuditLogRead(InnerTableRead dataRead) {
this.readProjection = defaultProjection();
}
- /** Default projection, just add row kind to the first. */
+ /** Default projection, add system fields (rowkind, and optionally _SEQUENCE_NUMBER). */
private int[] defaultProjection() {
int dataFieldCount = wrapped.rowType().getFieldCount();
- int[] projection = new int[dataFieldCount + 1];
- projection[0] = -1;
+ int[] projection = new int[dataFieldCount + specialFieldCount];
+ projection[0] = ROW_KIND_INDEX;
+ if (specialFieldCount > 1) {
+ projection[1] = SEQUENCE_NUMBER_INDEX;
+ }
for (int i = 0; i < dataFieldCount; i++) {
- projection[i + 1] = i;
+ projection[specialFieldCount + i] = i + specialFieldCount - 1;
}
return projection;
}
- @Override
- public InnerTableRead withFilter(Predicate predicate) {
- convert(predicate).ifPresent(dataRead::withFilter);
- return this;
- }
-
- @Override
- public InnerTableRead withReadType(RowType readType) {
- // data projection to push down to dataRead
- List dataReadFields = new ArrayList<>();
-
- // read projection to handle record returned by dataRead
+ /** Build projection array from readType. */
+ private int[] buildProjection(RowType readType) {
List fields = readType.getFields();
- int[] readProjection = new int[fields.size()];
+ int[] projection = new int[fields.size()];
+ int dataFieldIndex = 0;
- boolean rowKindAppeared = false;
for (int i = 0; i < fields.size(); i++) {
String fieldName = fields.get(i).name();
if (fieldName.equals(SpecialFields.ROW_KIND.name())) {
- rowKindAppeared = true;
- readProjection[i] = -1;
+ projection[i] = ROW_KIND_INDEX;
+ } else if (fieldName.equals(SpecialFields.SEQUENCE_NUMBER.name())) {
+ projection[i] = SEQUENCE_NUMBER_INDEX;
} else {
- dataReadFields.add(fields.get(i));
- // There is no row kind field. Keep it as it is
- // Row kind field has occurred, and the following fields are offset by 1
- // position
- readProjection[i] = rowKindAppeared ? i - 1 : i;
+ projection[i] = dataFieldIndex + specialFieldCount - 1;
+ dataFieldIndex++;
}
}
+ return projection;
+ }
+
+ @Override
+ public InnerTableRead withFilter(Predicate predicate) {
+ convert(predicate).ifPresent(dataRead::withFilter);
+ return this;
+ }
- this.readProjection = readProjection;
- dataRead.withReadType(new RowType(readType.isNullable(), dataReadFields));
+ @Override
+ public InnerTableRead withReadType(RowType readType) {
+ this.readProjection = buildProjection(readType);
+ List dataFields = extractDataFields(readType);
+ dataRead.withReadType(new RowType(readType.isNullable(), dataFields));
return this;
}
+ /** Extract data fields (non-system fields) from readType. */
+ private List extractDataFields(RowType readType) {
+ return readType.getFields().stream()
+ .filter(f -> !SpecialFields.isSystemField(f.name()))
+ .collect(Collectors.toList());
+ }
+
@Override
public TableRead withIOManager(IOManager ioManager) {
this.dataRead.withIOManager(ioManager);
@@ -701,7 +729,10 @@ private InternalRow convertRow(InternalRow data) {
}
}
- /** A {@link ProjectedRow} which returns row kind when mapping index is negative. */
+ /**
+ * A {@link ProjectedRow} which returns row kind and sequence number when mapping index is
+ * negative.
+ */
static class AuditLogRow extends ProjectedRow {
AuditLogRow(int[] indexMapping, InternalRow row) {
@@ -723,15 +754,26 @@ public void setRowKind(RowKind kind) {
@Override
public boolean isNullAt(int pos) {
if (indexMapping[pos] < 0) {
- // row kind is always not null
+ // row kind and sequence num are always not null
return false;
}
return super.isNullAt(pos);
}
+ @Override
+ public long getLong(int pos) {
+ int index = indexMapping[pos];
+ if (index == AuditLogRead.SEQUENCE_NUMBER_INDEX) {
+ // _SEQUENCE_NUMBER is at index 0 in bottom output
+ return row.getLong(0);
+ }
+ return super.getLong(pos);
+ }
+
@Override
public BinaryString getString(int pos) {
- if (indexMapping[pos] < 0) {
+ int index = indexMapping[pos];
+ if (index == AuditLogRead.ROW_KIND_INDEX) {
return BinaryString.fromString(row.getRowKind().shortString());
}
return super.getString(pos);
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java
index cc8d1621a3fa..66ed8bc2f9b2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java
@@ -72,6 +72,9 @@ public String name() {
public RowType rowType() {
List fields = new ArrayList<>();
fields.add(SpecialFields.ROW_KIND);
+ if (specialFieldCount > 1) {
+ fields.add(SpecialFields.SEQUENCE_NUMBER);
+ }
for (DataField field : wrapped.rowType().getFields()) {
// convert to nullable
fields.add(field.newType(new ArrayType(field.type().nullable())));
@@ -102,7 +105,7 @@ public InnerTableRead withReadType(RowType readType) {
List fields = new ArrayList<>();
List wrappedReadFields = new ArrayList<>();
for (DataField field : readType.getFields()) {
- if (field.name().equals(SpecialFields.ROW_KIND.name())) {
+ if (SpecialFields.isSystemField(field.name())) {
fields.add(field);
} else {
DataField origin = field.newType(((ArrayType) field.type()).getElementType());
@@ -117,12 +120,16 @@ public InnerTableRead withReadType(RowType readType) {
@Override
public RecordReader createReader(Split split) throws IOException {
DataSplit dataSplit = (DataSplit) split;
+ // When sequence number is enabled, the underlying data layout is:
+ // [_SEQUENCE_NUMBER, pk, pt, col1, ...]
+ // We need to offset the field index to skip the sequence number field.
+ int offset = specialFieldCount - 1;
InternalRow.FieldGetter[] fieldGetters =
IntStream.range(0, wrappedReadType.getFieldCount())
.mapToObj(
i ->
InternalRow.createFieldGetter(
- wrappedReadType.getTypeAt(i), i))
+ wrappedReadType.getTypeAt(i), i + offset))
.toArray(InternalRow.FieldGetter[]::new);
if (dataSplit.isStreaming()) {
@@ -146,15 +153,23 @@ private InternalRow convertToArray(
InternalRow row1,
@Nullable InternalRow row2,
InternalRow.FieldGetter[] fieldGetters) {
- GenericRow row = new GenericRow(row1.getFieldCount());
- for (int i = 0; i < row1.getFieldCount(); i++) {
+ // seqOffset is 1 if sequence number is enabled, 0 otherwise
+ int seqOffset = specialFieldCount - 1;
+ GenericRow row = new GenericRow(fieldGetters.length + seqOffset);
+
+ // Copy sequence number if enabled (it's at index 0 in input row)
+ if (seqOffset > 0) {
+ row.setField(0, row1.getLong(0));
+ }
+
+ for (int i = 0; i < fieldGetters.length; i++) {
Object o1 = fieldGetters[i].getFieldOrNull(row1);
Object o2;
if (row2 != null) {
o2 = fieldGetters[i].getFieldOrNull(row2);
- row.setField(i, new GenericArray(new Object[] {o1, o2}));
+ row.setField(i + seqOffset, new GenericArray(new Object[] {o1, o2}));
} else {
- row.setField(i, new GenericArray(new Object[] {o1}));
+ row.setField(i + seqOffset, new GenericArray(new Object[] {o1}));
}
}
// If no row2 provided, then follow the row1 kind.
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/system/AuditLogTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/system/AuditLogTableTest.java
index 41facd1d837f..36031884ce71 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/system/AuditLogTableTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/system/AuditLogTableTest.java
@@ -27,6 +27,7 @@
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.SchemaUtils;
import org.apache.paimon.schema.TableSchema;
@@ -36,7 +37,6 @@
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
@@ -48,15 +48,62 @@
/** Unit tests for {@link AuditLogTable}. */
public class AuditLogTableTest extends TableTestBase {
- private static final String tableName = "MyTable";
- private AuditLogTable auditLogTable;
+ @Test
+ public void testReadAuditLogFromLatest() throws Exception {
+ AuditLogTable auditLogTable = createAuditLogTable("audit_table", false);
+ assertThat(auditLogTable.rowType().getFieldNames())
+ .containsExactly("rowkind", "pk", "pt", "col1");
+ List expectRow = getExpectedResult();
+ List result = read(auditLogTable);
+ assertThat(result).containsExactlyInAnyOrderElementsOf(expectRow);
+ }
+
+ @Test
+ public void testReadSequenceNumberWithTableOption() throws Exception {
+ AuditLogTable auditLogTable = createAuditLogTable("audit_table_with_seq", true);
+ assertThat(auditLogTable.rowType().getFieldNames())
+ .containsExactly("rowkind", "_SEQUENCE_NUMBER", "pk", "pt", "col1");
+
+ List result = read(auditLogTable);
+ List expectRow = getExpectedResultWithSequenceNumber();
+ assertThat(result).containsExactlyInAnyOrderElementsOf(expectRow);
+ }
+
+ @Test
+ public void testReadSequenceNumberWithAlterTable() throws Exception {
+ String tableName = "audit_table_alter_seq";
+ // Create table without sequence-number option
+ AuditLogTable auditLogTable = createAuditLogTable(tableName, false);
+ assertThat(auditLogTable.rowType().getFieldNames())
+ .containsExactly("rowkind", "pk", "pt", "col1");
+
+ // Add sequence-number option via alterTable
+ catalog.alterTable(
+ identifier(tableName),
+ SchemaChange.setOption(
+ CoreOptions.CHANGELOG_READ_SEQUENCE_NUMBER_ENABLED.key(), "true"),
+ false);
- @BeforeEach
- public void before() throws Exception {
+ // Re-fetch the audit_log table to get updated schema
+ Identifier auditLogTableId =
+ identifier(tableName + SYSTEM_TABLE_SPLITTER + AuditLogTable.AUDIT_LOG);
+ AuditLogTable updatedAuditLogTable = (AuditLogTable) catalog.getTable(auditLogTableId);
+
+ // Verify schema now includes _SEQUENCE_NUMBER
+ assertThat(updatedAuditLogTable.rowType().getFieldNames())
+ .containsExactly("rowkind", "_SEQUENCE_NUMBER", "pk", "pt", "col1");
+
+ List result = read(updatedAuditLogTable);
+ List expectRow = getExpectedResultWithSequenceNumber();
+ assertThat(result).containsExactlyInAnyOrderElementsOf(expectRow);
+ }
+
+ private AuditLogTable createAuditLogTable(String tableName, boolean enableSequenceNumber)
+ throws Exception {
Path tablePath = new Path(String.format("%s/%s.db/%s", warehouse, database, tableName));
FileIO fileIO = LocalFileIO.create();
- Schema schema =
+ Schema.Builder schemaBuilder =
Schema.newBuilder()
.column("pk", DataTypes.INT())
.column("pt", DataTypes.INT())
@@ -64,44 +111,54 @@ public void before() throws Exception {
.partitionKeys("pt")
.primaryKey("pk", "pt")
.option(CoreOptions.CHANGELOG_PRODUCER.key(), "input")
- .option("bucket", "1")
- .build();
+ .option("bucket", "1");
+ if (enableSequenceNumber) {
+ schemaBuilder.option(CoreOptions.CHANGELOG_READ_SEQUENCE_NUMBER_ENABLED.key(), "true");
+ }
TableSchema tableSchema =
- SchemaUtils.forceCommit(new SchemaManager(fileIO, tablePath), schema);
+ SchemaUtils.forceCommit(
+ new SchemaManager(fileIO, tablePath), schemaBuilder.build());
FileStoreTable table =
FileStoreTableFactory.create(LocalFileIO.create(), tablePath, tableSchema);
- Identifier filesTableId =
+
+ writeTestData(table);
+
+ Identifier auditLogTableId =
identifier(tableName + SYSTEM_TABLE_SPLITTER + AuditLogTable.AUDIT_LOG);
- auditLogTable = (AuditLogTable) catalog.getTable(filesTableId);
+ return (AuditLogTable) catalog.getTable(auditLogTableId);
+ }
+ private void writeTestData(FileStoreTable table) throws Exception {
write(table, GenericRow.ofKind(RowKind.INSERT, 1, 1, 1));
write(table, GenericRow.ofKind(RowKind.DELETE, 1, 1, 1));
write(table, GenericRow.ofKind(RowKind.INSERT, 1, 2, 5));
write(table, GenericRow.ofKind(RowKind.UPDATE_BEFORE, 1, 2, 5));
- write(table, GenericRow.ofKind(RowKind.UPDATE_AFTER, 1, 4, 6));
+ write(table, GenericRow.ofKind(RowKind.UPDATE_AFTER, 1, 2, 6));
write(table, GenericRow.ofKind(RowKind.INSERT, 2, 3, 1));
}
- @Test
- public void testReadAuditLogFromLatest() throws Exception {
- List expectRow = getExpectedResult();
- List result = read(auditLogTable);
- assertThat(result).containsExactlyInAnyOrderElementsOf(expectRow);
- }
-
private List getExpectedResult() {
List expectedRow = new ArrayList<>();
expectedRow.add(
GenericRow.of(BinaryString.fromString(RowKind.DELETE.shortString()), 1, 1, 1));
expectedRow.add(
GenericRow.of(
- BinaryString.fromString(RowKind.UPDATE_BEFORE.shortString()), 1, 2, 5));
+ BinaryString.fromString(RowKind.UPDATE_AFTER.shortString()), 1, 2, 6));
+ expectedRow.add(
+ GenericRow.of(BinaryString.fromString(RowKind.INSERT.shortString()), 2, 3, 1));
+ return expectedRow;
+ }
+
+ private List getExpectedResultWithSequenceNumber() {
+ List expectedRow = new ArrayList<>();
+ expectedRow.add(
+ GenericRow.of(BinaryString.fromString(RowKind.DELETE.shortString()), 1L, 1, 1, 1));
expectedRow.add(
GenericRow.of(
- BinaryString.fromString(RowKind.UPDATE_AFTER.shortString()), 1, 4, 6));
+ BinaryString.fromString(RowKind.UPDATE_AFTER.shortString()), 2L, 1, 2, 6));
expectedRow.add(
- GenericRow.of(BinaryString.fromString(RowKind.INSERT.shortString()), 2, 3, 1));
+ GenericRow.of(BinaryString.fromString(RowKind.INSERT.shortString()), 0L, 2, 3, 1));
return expectedRow;
}
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/system/BinlogTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/system/BinlogTableTest.java
new file mode 100644
index 000000000000..e6feb8e8f1b1
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/table/system/BinlogTableTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.system;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericArray;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.SchemaUtils;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowKind;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit tests for {@link BinlogTable}. */
+public class BinlogTableTest extends TableTestBase {
+
+ @Test
+ public void testReadBinlogFromLatest() throws Exception {
+ BinlogTable binlogTable = createBinlogTable("binlog_table", false);
+ assertThat(binlogTable.rowType().getFieldNames())
+ .containsExactly("rowkind", "pk", "pt", "col1");
+
+ List result = read(binlogTable);
+ List expectRow = getExpectedResult();
+ assertThat(result).containsExactlyInAnyOrderElementsOf(expectRow);
+ }
+
+ @Test
+ public void testReadSequenceNumberWithTableOption() throws Exception {
+ BinlogTable binlogTable = createBinlogTable("binlog_table_with_seq", true);
+ assertThat(binlogTable.rowType().getFieldNames())
+ .containsExactly("rowkind", "_SEQUENCE_NUMBER", "pk", "pt", "col1");
+
+ List result = read(binlogTable);
+ List expectRow = getExpectedResultWithSequenceNumber();
+ assertThat(result).containsExactlyInAnyOrderElementsOf(expectRow);
+ }
+
+ @Test
+ public void testReadSequenceNumberWithAlterTable() throws Exception {
+ String tableName = "binlog_table_alter_seq";
+ // Create table without sequence-number option
+ BinlogTable binlogTable = createBinlogTable(tableName, false);
+ assertThat(binlogTable.rowType().getFieldNames())
+ .containsExactly("rowkind", "pk", "pt", "col1");
+
+ // Add sequence-number option via alterTable
+ catalog.alterTable(
+ identifier(tableName),
+ SchemaChange.setOption(
+ CoreOptions.CHANGELOG_READ_SEQUENCE_NUMBER_ENABLED.key(), "true"),
+ false);
+
+ // Re-fetch the binlog table to get updated schema
+ Identifier binlogTableId =
+ identifier(tableName + SYSTEM_TABLE_SPLITTER + BinlogTable.BINLOG);
+ BinlogTable updatedBinlogTable = (BinlogTable) catalog.getTable(binlogTableId);
+
+ // Verify schema now includes _SEQUENCE_NUMBER
+ assertThat(updatedBinlogTable.rowType().getFieldNames())
+ .containsExactly("rowkind", "_SEQUENCE_NUMBER", "pk", "pt", "col1");
+
+ List result = read(updatedBinlogTable);
+ List expectRow = getExpectedResultWithSequenceNumber();
+ assertThat(result).containsExactlyInAnyOrderElementsOf(expectRow);
+ }
+
+ private BinlogTable createBinlogTable(String tableName, boolean enableSequenceNumber)
+ throws Exception {
+ Path tablePath = new Path(String.format("%s/%s.db/%s", warehouse, database, tableName));
+ FileIO fileIO = LocalFileIO.create();
+
+ Schema.Builder schemaBuilder =
+ Schema.newBuilder()
+ .column("pk", DataTypes.INT())
+ .column("pt", DataTypes.INT())
+ .column("col1", DataTypes.INT())
+ .partitionKeys("pt")
+ .primaryKey("pk", "pt")
+ .option(CoreOptions.CHANGELOG_PRODUCER.key(), "input")
+ .option("bucket", "1");
+ if (enableSequenceNumber) {
+ schemaBuilder.option(CoreOptions.CHANGELOG_READ_SEQUENCE_NUMBER_ENABLED.key(), "true");
+ }
+
+ TableSchema tableSchema =
+ SchemaUtils.forceCommit(
+ new SchemaManager(fileIO, tablePath), schemaBuilder.build());
+ FileStoreTable table =
+ FileStoreTableFactory.create(LocalFileIO.create(), tablePath, tableSchema);
+
+ writeTestData(table);
+
+ Identifier binlogTableId =
+ identifier(tableName + SYSTEM_TABLE_SPLITTER + BinlogTable.BINLOG);
+ return (BinlogTable) catalog.getTable(binlogTableId);
+ }
+
+ private void writeTestData(FileStoreTable table) throws Exception {
+ write(table, GenericRow.ofKind(RowKind.INSERT, 1, 1, 1));
+ write(table, GenericRow.ofKind(RowKind.DELETE, 1, 1, 1));
+ write(table, GenericRow.ofKind(RowKind.INSERT, 1, 2, 5));
+ write(table, GenericRow.ofKind(RowKind.UPDATE_BEFORE, 1, 2, 5));
+ write(table, GenericRow.ofKind(RowKind.UPDATE_AFTER, 1, 2, 6));
+ write(table, GenericRow.ofKind(RowKind.INSERT, 2, 3, 1));
+ }
+
+ private List getExpectedResult() {
+ List expectedRow = new ArrayList<>();
+ expectedRow.add(
+ GenericRow.of(
+ BinaryString.fromString(RowKind.DELETE.shortString()),
+ new GenericArray(new Object[] {1}),
+ new GenericArray(new Object[] {1}),
+ new GenericArray(new Object[] {1})));
+ expectedRow.add(
+ GenericRow.of(
+ BinaryString.fromString(RowKind.UPDATE_AFTER.shortString()),
+ new GenericArray(new Object[] {1}),
+ new GenericArray(new Object[] {2}),
+ new GenericArray(new Object[] {6})));
+ expectedRow.add(
+ GenericRow.of(
+ BinaryString.fromString(RowKind.INSERT.shortString()),
+ new GenericArray(new Object[] {2}),
+ new GenericArray(new Object[] {3}),
+ new GenericArray(new Object[] {1})));
+ return expectedRow;
+ }
+
+ private List getExpectedResultWithSequenceNumber() {
+ List expectedRow = new ArrayList<>();
+ expectedRow.add(
+ GenericRow.of(
+ BinaryString.fromString(RowKind.DELETE.shortString()),
+ 1L,
+ new GenericArray(new Object[] {1}),
+ new GenericArray(new Object[] {1}),
+ new GenericArray(new Object[] {1})));
+ expectedRow.add(
+ GenericRow.of(
+ BinaryString.fromString(RowKind.UPDATE_AFTER.shortString()),
+ 2L,
+ new GenericArray(new Object[] {1}),
+ new GenericArray(new Object[] {2}),
+ new GenericArray(new Object[] {6})));
+ expectedRow.add(
+ GenericRow.of(
+ BinaryString.fromString(RowKind.INSERT.shortString()),
+ 0L,
+ new GenericArray(new Object[] {2}),
+ new GenericArray(new Object[] {3}),
+ new GenericArray(new Object[] {1})));
+ return expectedRow;
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java
index f955418b6d21..c2a17412fe78 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java
@@ -29,14 +29,13 @@
import org.apache.paimon.table.source.AbstractDataTableRead;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.InnerTableRead;
+import org.apache.paimon.table.source.KeyValueTableRead;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.types.RowType;
import java.io.IOException;
-import static org.apache.paimon.table.source.KeyValueTableRead.unwrap;
-
/** An {@link InnerTableRead} that reads the data changed before and after compaction. */
public class LookupCompactDiffRead extends AbstractDataTableRead {
private final SplitRead fullPhaseMergeRead;
@@ -46,7 +45,11 @@ public LookupCompactDiffRead(MergeFileSplitRead mergeRead, TableSchema schema) {
super(schema);
this.incrementalDiffRead = new IncrementalCompactDiffSplitRead(mergeRead);
this.fullPhaseMergeRead =
- SplitRead.convert(mergeRead, split -> unwrap(mergeRead.createReader(split)));
+ SplitRead.convert(
+ mergeRead,
+ split ->
+ KeyValueTableRead.unwrap(
+ mergeRead.createReader(split), schema.options()));
}
@Override
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index 22271c8a7ebe..bea70497071b 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -1053,6 +1053,104 @@ public void testBinlogTableWithProjection() {
.containsExactly(Row.of("+I", new String[] {"A"}));
}
+ @Test
+ public void testAuditLogTableWithSequenceNumberEnabled() {
+ // Create primary key table with changelog-read.sequence-number.enabled option
+ sql(
+ "CREATE TABLE test_table_seq (a int PRIMARY KEY NOT ENFORCED, b int, c AS a + b) "
+ + "WITH ('changelog-read.sequence-number.enabled'='true');");
+ sql("INSERT INTO test_table_seq VALUES (1, 2)");
+ sql("INSERT INTO test_table_seq VALUES (3, 4)");
+
+ // Test SELECT * includes _SEQUENCE_NUMBER
+ assertThat(sql("SELECT * FROM `test_table_seq$audit_log`"))
+ .containsExactlyInAnyOrder(Row.of("+I", 0L, 1, 2, 3), Row.of("+I", 1L, 3, 4, 7));
+
+ // Test out-of-order select with _SEQUENCE_NUMBER
+ assertThat(sql("SELECT b, c, _SEQUENCE_NUMBER FROM `test_table_seq$audit_log`"))
+ .containsExactlyInAnyOrder(Row.of(2, 3, 0L), Row.of(4, 7, 1L));
+
+ // Test selecting only _SEQUENCE_NUMBER, rowkind
+ assertThat(sql("SELECT _SEQUENCE_NUMBER, rowkind FROM `test_table_seq$audit_log`"))
+ .containsExactlyInAnyOrder(Row.of(0L, "+I"), Row.of(1L, "+I"));
+ }
+
+ @Test
+ public void testAuditLogTableWithSequenceNumberAlterTable() {
+ // Create primary key table without sequence-number option
+ sql("CREATE TABLE test_table_dyn (a int PRIMARY KEY NOT ENFORCED, b int, c AS a + b);");
+ sql("INSERT INTO test_table_dyn VALUES (1, 2)");
+ sql("INSERT INTO test_table_dyn VALUES (3, 4)");
+
+ // Add changelog-read.sequence-number.enabled option via ALTER TABLE
+ sql("ALTER TABLE test_table_dyn SET ('changelog-read.sequence-number.enabled'='true')");
+
+ // Test SELECT * includes _SEQUENCE_NUMBER (same as
+ // testAuditLogTableWithSequenceNumberEnabled)
+ assertThat(sql("SELECT * FROM `test_table_dyn$audit_log`"))
+ .containsExactlyInAnyOrder(Row.of("+I", 0L, 1, 2, 3), Row.of("+I", 1L, 3, 4, 7));
+
+ // Test out-of-order select with _SEQUENCE_NUMBER
+ assertThat(sql("SELECT b, c, _SEQUENCE_NUMBER FROM `test_table_dyn$audit_log`"))
+ .containsExactlyInAnyOrder(Row.of(2, 3, 0L), Row.of(4, 7, 1L));
+
+ // Test selecting only _SEQUENCE_NUMBER, rowkind
+ assertThat(sql("SELECT _SEQUENCE_NUMBER, rowkind FROM `test_table_dyn$audit_log`"))
+ .containsExactlyInAnyOrder(Row.of(0L, "+I"), Row.of(1L, "+I"));
+ }
+
+ @Test
+ public void testBinlogTableWithSequenceNumberEnabled() {
+ // Create primary key table with changelog-read.sequence-number.enabled option
+ sql(
+ "CREATE TABLE test_table_seq (a int PRIMARY KEY NOT ENFORCED, b int) "
+ + "WITH ('changelog-read.sequence-number.enabled'='true');");
+ sql("INSERT INTO test_table_seq VALUES (1, 2)");
+ sql("INSERT INTO test_table_seq VALUES (3, 4)");
+
+ // Test SELECT * includes _SEQUENCE_NUMBER
+ assertThat(sql("SELECT * FROM `test_table_seq$binlog`"))
+ .containsExactlyInAnyOrder(
+ Row.of("+I", 0L, new Integer[] {1}, new Integer[] {2}),
+ Row.of("+I", 1L, new Integer[] {3}, new Integer[] {4}));
+
+ // Test out-of-order select with _SEQUENCE_NUMBER
+ assertThat(sql("SELECT b, _SEQUENCE_NUMBER FROM `test_table_seq$binlog`"))
+ .containsExactlyInAnyOrder(
+ Row.of(new Integer[] {2}, 0L), Row.of(new Integer[] {4}, 1L));
+
+ // Test selecting only _SEQUENCE_NUMBER
+ assertThat(sql("SELECT _SEQUENCE_NUMBER, rowkind FROM `test_table_seq$binlog`"))
+ .containsExactlyInAnyOrder(Row.of(0L, "+I"), Row.of(1L, "+I"));
+ }
+
+ @Test
+ public void testBinlogTableWithSequenceNumberAlterTable() {
+ // Create primary key table without sequence-number option
+ sql("CREATE TABLE test_table_dyn (a int PRIMARY KEY NOT ENFORCED, b int);");
+ sql("INSERT INTO test_table_dyn VALUES (1, 2)");
+ sql("INSERT INTO test_table_dyn VALUES (3, 4)");
+
+ // Add changelog-read.sequence-number.enabled option via ALTER TABLE
+ sql("ALTER TABLE test_table_dyn SET ('changelog-read.sequence-number.enabled'='true')");
+
+ // Test SELECT * includes _SEQUENCE_NUMBER (same as
+ // testBinlogTableWithSequenceNumberEnabled)
+ assertThat(sql("SELECT * FROM `test_table_dyn$binlog`"))
+ .containsExactlyInAnyOrder(
+ Row.of("+I", 0L, new Integer[] {1}, new Integer[] {2}),
+ Row.of("+I", 1L, new Integer[] {3}, new Integer[] {4}));
+
+ // Test out-of-order select with _SEQUENCE_NUMBER
+ assertThat(sql("SELECT b, _SEQUENCE_NUMBER FROM `test_table_dyn$binlog`"))
+ .containsExactlyInAnyOrder(
+ Row.of(new Integer[] {2}, 0L), Row.of(new Integer[] {4}, 1L));
+
+ // Test selecting only _SEQUENCE_NUMBER
+ assertThat(sql("SELECT _SEQUENCE_NUMBER, rowkind FROM `test_table_dyn$binlog`"))
+ .containsExactlyInAnyOrder(Row.of(0L, "+I"), Row.of(1L, "+I"));
+ }
+
@Test
public void testBatchReadSourceWithSnapshot() {
batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222), (3, 33, 333), (4, 44, 444)");