Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@
<td>String</td>
<td>Fields that are ignored for comparison while generating -U, +U changelog for the same record. This configuration is only valid for the changelog-producer.row-deduplicate is true.</td>
</tr>
<tr>
<td><h5>changelog-read.sequence-number.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to include _SEQUENCE_NUMBER field in audit_log and binlog system tables. This is only valid for primary key tables.</td>
</tr>
<tr>
<td><h5>changelog.num-retained.max</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
12 changes: 12 additions & 0 deletions paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -827,6 +827,14 @@ public InlineElement getDescription() {
.withDescription(
"Fields that are ignored for comparison while generating -U, +U changelog for the same record. This configuration is only valid for the changelog-producer.row-deduplicate is true.");

public static final ConfigOption<Boolean> CHANGELOG_READ_SEQUENCE_NUMBER_ENABLED =
key("changelog-read.sequence-number.enabled")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to include _SEQUENCE_NUMBER field in audit_log and binlog system tables. "
+ "This is only valid for primary key tables.");

public static final ConfigOption<String> SEQUENCE_FIELD =
key("sequence.field")
.stringType()
Expand Down Expand Up @@ -2721,6 +2729,10 @@ public List<String> changelogRowDeduplicateIgnoreFields() {
.orElse(Collections.emptyList());
}

public boolean changelogReadSequenceNumberEnabled() {
return options.get(CHANGELOG_READ_SEQUENCE_NUMBER_ENABLED);
}

public boolean scanPlanSortPartition() {
return options.get(SCAN_PLAN_SORT_PARTITION);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,8 @@ public static void validateTableSchema(TableSchema schema) {
validateIncrementalClustering(schema, options);

validateChainTable(schema, options);

validateChangelogReadSequenceNumber(schema, options);
}

public static void validateFallbackBranch(SchemaManager schemaManager, TableSchema schema) {
Expand Down Expand Up @@ -713,4 +715,15 @@ public static void validateChainTable(TableSchema schema, CoreOptions options) {
"Partition timestamp formatter is required for chain table.");
}
}

private static void validateChangelogReadSequenceNumber(
TableSchema schema, CoreOptions options) {
if (options.changelogReadSequenceNumberEnabled()) {
checkArgument(
!schema.primaryKeys().isEmpty(),
"Cannot enable '%s' for non-primary-key table. "
+ "Sequence number is only available for primary key tables.",
CoreOptions.CHANGELOG_READ_SEQUENCE_NUMBER_ENABLED.key());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;

/**
Expand Down Expand Up @@ -161,14 +162,17 @@ public RecordReader<InternalRow> reader(Split split) throws IOException {
throw new RuntimeException("Should not happen.");
}

public static RecordReader<InternalRow> unwrap(RecordReader<KeyValue> reader) {
public static RecordReader<InternalRow> unwrap(
RecordReader<KeyValue> reader, Map<String, String> schemaOptions) {
return new RecordReader<InternalRow>() {

@Nullable
@Override
public RecordIterator<InternalRow> readBatch() throws IOException {
RecordIterator<KeyValue> batch = reader.readBatch();
return batch == null ? null : new ValueContentRowDataRecordIterator(batch);
return batch == null
? null
: new ValueContentRowDataRecordIterator(batch, schemaOptions);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,31 @@

package org.apache.paimon.table.source;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.JoinedRow;
import org.apache.paimon.reader.RecordReader;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/** A {@link RecordReader.RecordIterator} mapping a {@link KeyValue} to its value. */
public class ValueContentRowDataRecordIterator extends ResetRowKindRecordIterator {

private final boolean includeSequenceNumber;

public ValueContentRowDataRecordIterator(RecordReader.RecordIterator<KeyValue> kvIterator) {
this(kvIterator, new HashMap<>(1));
}

public ValueContentRowDataRecordIterator(
RecordReader.RecordIterator<KeyValue> kvIterator, Map<String, String> schemaOptions) {
super(kvIterator);
this.includeSequenceNumber =
CoreOptions.fromMap(schemaOptions).changelogReadSequenceNumberEnabled();
}

@Override
Expand All @@ -40,6 +54,15 @@ public InternalRow next() throws IOException {

InternalRow rowData = kv.value();
rowData.setRowKind(kv.valueKind());

if (includeSequenceNumber) {
JoinedRow joinedRow = new JoinedRow();
GenericRow systemFieldsRow = new GenericRow(1);
systemFieldsRow.setField(0, kv.sequenceNumber());
joinedRow.replace(systemFieldsRow, rowData);
joinedRow.setRowKind(kv.valueKind());
return joinedRow;
}
return rowData;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ private SplitRead<InternalRow> create(Supplier<MergeFileSplitRead> supplier) {
dataSplit.dataFiles(),
dataSplit.deletionFiles().orElse(null),
false));
return unwrap(reader);
return unwrap(reader, read.tableSchema().options());
};

return SplitRead.convert(read, convertedFactory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public RecordReader<InternalRow> createReader(Split s) throws IOException {
ProjectedRow.from(readType, mergeRead.tableSchema().logicalRowType());
reader = reader.transform(kv -> kv.replaceValue(projectedRow.replaceRow(kv.value())));
}
return KeyValueTableRead.unwrap(reader);
return KeyValueTableRead.unwrap(reader, mergeRead.tableSchema().options());
}

private static RecordReader<KeyValue> readDiff(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ public MergeFileSplitReadProvider(

private SplitRead<InternalRow> create(Supplier<MergeFileSplitRead> supplier) {
final MergeFileSplitRead read = supplier.get().withReadKeyType(RowType.of());
return SplitRead.convert(read, split -> unwrap(read.createReader(split)));
return SplitRead.convert(
read, split -> unwrap(read.createReader(split), read.tableSchema().options()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,24 +88,34 @@ public class AuditLogTable implements DataTable, ReadonlyTable {

public static final String AUDIT_LOG = "audit_log";

public static final PredicateReplaceVisitor PREDICATE_CONVERTER =
p -> {
if (p.index() == 0) {
return Optional.empty();
}
return Optional.of(
new LeafPredicate(
p.function(),
p.type(),
p.index() - 1,
p.fieldName(),
p.literals()));
};

private final FileStoreTable wrapped;

/** Number of special fields (rowkind, and optionally _SEQUENCE_NUMBER). */
protected final int specialFieldCount;

public AuditLogTable(FileStoreTable wrapped) {
this.wrapped = wrapped;
this.specialFieldCount =
coreOptions().changelogReadSequenceNumberEnabled()
&& !wrapped.primaryKeys().isEmpty()
? 2
: 1;
}

/** Creates a PredicateReplaceVisitor that adjusts field indices by systemFieldCount. */
private PredicateReplaceVisitor createPredicateConverter() {
return p -> {
if (p.index() < specialFieldCount) {
return Optional.empty();
}
return Optional.of(
new LeafPredicate(
p.function(),
p.type(),
p.index() - specialFieldCount,
p.fieldName(),
p.literals()));
};
}

@Override
Expand Down Expand Up @@ -142,6 +152,9 @@ public String name() {
public RowType rowType() {
List<DataField> fields = new ArrayList<>();
fields.add(SpecialFields.ROW_KIND);
if (specialFieldCount > 1) {
fields.add(SpecialFields.SEQUENCE_NUMBER);
}
fields.addAll(wrapped.rowType().getFields());
return new RowType(fields);
}
Expand Down Expand Up @@ -238,9 +251,10 @@ public FileIO fileIO() {

/** Push down predicate to dataScan and dataRead. */
private Optional<Predicate> convert(Predicate predicate) {
PredicateReplaceVisitor converter = createPredicateConverter();
List<Predicate> result =
PredicateBuilder.splitAnd(predicate).stream()
.map(p -> p.visit(PREDICATE_CONVERTER))
.map(p -> p.visit(converter))
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toList());
Expand Down Expand Up @@ -630,6 +644,11 @@ public DataTableScan withShard(int indexOfThisSubtask, int numberOfParallelSubta

class AuditLogRead implements InnerTableRead {

// Special index for rowkind field
protected static final int ROW_KIND_INDEX = -1;
// Special index for _SEQUENCE_NUMBER field
protected static final int SEQUENCE_NUMBER_INDEX = -2;

protected final InnerTableRead dataRead;

protected int[] readProjection;
Expand All @@ -639,52 +658,61 @@ protected AuditLogRead(InnerTableRead dataRead) {
this.readProjection = defaultProjection();
}

/** Default projection, just add row kind to the first. */
/** Default projection, add system fields (rowkind, and optionally _SEQUENCE_NUMBER). */
private int[] defaultProjection() {
int dataFieldCount = wrapped.rowType().getFieldCount();
int[] projection = new int[dataFieldCount + 1];
projection[0] = -1;
int[] projection = new int[dataFieldCount + specialFieldCount];
projection[0] = ROW_KIND_INDEX;
if (specialFieldCount > 1) {
projection[1] = SEQUENCE_NUMBER_INDEX;
}
for (int i = 0; i < dataFieldCount; i++) {
projection[i + 1] = i;
projection[specialFieldCount + i] = i + specialFieldCount - 1;
}
return projection;
}

@Override
public InnerTableRead withFilter(Predicate predicate) {
convert(predicate).ifPresent(dataRead::withFilter);
return this;
}

@Override
public InnerTableRead withReadType(RowType readType) {
// data projection to push down to dataRead
List<DataField> dataReadFields = new ArrayList<>();

// read projection to handle record returned by dataRead
/** Build projection array from readType. */
private int[] buildProjection(RowType readType) {
List<DataField> fields = readType.getFields();
int[] readProjection = new int[fields.size()];
int[] projection = new int[fields.size()];
int dataFieldIndex = 0;

boolean rowKindAppeared = false;
for (int i = 0; i < fields.size(); i++) {
String fieldName = fields.get(i).name();
if (fieldName.equals(SpecialFields.ROW_KIND.name())) {
rowKindAppeared = true;
readProjection[i] = -1;
projection[i] = ROW_KIND_INDEX;
} else if (fieldName.equals(SpecialFields.SEQUENCE_NUMBER.name())) {
projection[i] = SEQUENCE_NUMBER_INDEX;
} else {
dataReadFields.add(fields.get(i));
// There is no row kind field. Keep it as it is
// Row kind field has occurred, and the following fields are offset by 1
// position
readProjection[i] = rowKindAppeared ? i - 1 : i;
projection[i] = dataFieldIndex + specialFieldCount - 1;
dataFieldIndex++;
}
}
return projection;
}

@Override
public InnerTableRead withFilter(Predicate predicate) {
convert(predicate).ifPresent(dataRead::withFilter);
return this;
}

this.readProjection = readProjection;
dataRead.withReadType(new RowType(readType.isNullable(), dataReadFields));
@Override
public InnerTableRead withReadType(RowType readType) {
this.readProjection = buildProjection(readType);
List<DataField> dataFields = extractDataFields(readType);
dataRead.withReadType(new RowType(readType.isNullable(), dataFields));
return this;
}

/** Extract data fields (non-system fields) from readType. */
private List<DataField> extractDataFields(RowType readType) {
return readType.getFields().stream()
.filter(f -> !SpecialFields.isSystemField(f.name()))
.collect(Collectors.toList());
}

@Override
public TableRead withIOManager(IOManager ioManager) {
this.dataRead.withIOManager(ioManager);
Expand All @@ -701,7 +729,10 @@ private InternalRow convertRow(InternalRow data) {
}
}

/** A {@link ProjectedRow} which returns row kind when mapping index is negative. */
/**
* A {@link ProjectedRow} which returns row kind and sequence number when mapping index is
* negative.
*/
static class AuditLogRow extends ProjectedRow {

AuditLogRow(int[] indexMapping, InternalRow row) {
Expand All @@ -723,15 +754,26 @@ public void setRowKind(RowKind kind) {
@Override
public boolean isNullAt(int pos) {
if (indexMapping[pos] < 0) {
// row kind is always not null
// row kind and sequence num are always not null
return false;
}
return super.isNullAt(pos);
}

@Override
public long getLong(int pos) {
int index = indexMapping[pos];
if (index == AuditLogRead.SEQUENCE_NUMBER_INDEX) {
// _SEQUENCE_NUMBER is at index 0 in bottom output
return row.getLong(0);
}
return super.getLong(pos);
}

@Override
public BinaryString getString(int pos) {
if (indexMapping[pos] < 0) {
int index = indexMapping[pos];
if (index == AuditLogRead.ROW_KIND_INDEX) {
return BinaryString.fromString(row.getRowKind().shortString());
}
return super.getString(pos);
Expand Down
Loading
Loading