Skip to content

Conversation

@buvb
Copy link
Contributor

@buvb buvb commented Dec 16, 2025

Purpose

Linked issue: close #2128

This PR enables schema evolution for datalake-enabled tables, specifically supporting ADD COLUMN ... LAST with NULLABLE columns. When a user executes ALTER TABLE ADD COLUMN on a lake-enabled table, the schema change is first applied to Fluss (source of truth), then synchronized to Paimon.

Brief change log

  • CoordinatorService: Pass LakeCatalog and LakeCatalogContext to [alterTableSchema()]for Paimon synchronization
  • MetadataManager: Add [syncSchemaChangesToLake()] to sync schema changes to Paimon after Fluss schema update; skip schema registration if schema unchanged (retry idempotency)
  • SchemaUpdate: Support idempotent [addColumn()] - if column already exists with same type and comment, treat as no-op
  • PaimonLakeCatalog: Handle ColumnAlreadyExistException as idempotent success for retry scenarios
  • PaimonConversions: Map Fluss AddColumn to Paimon SchemaChange, inserting new column before system columns
  • FlussRecordAsPaimonRow: Handle tiering transition period when Fluss record is wider than Paimon schema using min(internalRow.getFieldCount(), businessFieldCount)

Tests

  • LakeEnabledTableCreateITCase#testAlterLakeEnabledTableSchema - Verify ADD COLUMN syncs to Paimon with correct type and comment
  • FlussRecordAsPaimonRowTest#testFlussRecordWiderThanPaimonSchema - Verify tiering doesn't crash when Fluss record has more fields than Paimon schema
  • FlussRecordAsPaimonRowTest#testPaimonSchemaWiderThanFlussRecord - Verify padding NULL for missing fields

API and Format

No API or storage format changes.

Documentation

No documentation changes required for this MVP. Future documentation may be needed when more schema evolution operations are supported.

Copy link
Contributor

@loserwang1024 loserwang1024 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@buvb Thanks for your great job. I have left some adive.

Would you like to add test in PaimonTieringITCase or PaimonTieringTest for a e2e test (just like what PaimonTieringITCase#testTieringForAlterTable does). Add coumn before and bewteen tier service.

private final int bucket;
private LogRecord logRecord;
private int originRowFieldCount;
private final int businessFieldCount;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe dataFieldCount?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer businessFieldCount because it clearly distinguishes from system columns (__bucket, __offset, __timestamp). dataFieldCount might be confused with total field count.

// update the schema
zookeeperClient.registerSchema(tablePath, newSchema, table.getSchemaId() + 1);
// update the schema in Fluss (ZK) first - Fluss is the source of truth
if (!newSchema.equals(table.getSchema())) {
Copy link
Contributor

@loserwang1024 loserwang1024 Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It better check whether lakehouse can do it before zookeeperClient.registerSchema.

  1. check that fluss can apply schema change.
    SchemaUpdate.applySchemaChanges(table, schemaChanges)
  2. Check whether the schema can be apply to lake.
  3. zookeeperClient.registerSchema(tablePath, newSchema, table.getSchemaId() + 1). This can be rarely happen because only zookeeper.
  4. syncSchemaChangesToLake.
    For example, currently, iceberg catalog is not support add column. If we zookeeperClient.registerSchema successfully, problem will occur.

Also a problem left is what if zookeeperClient.registerSchema success but syncSchemaChangesToLake fails? A assumed that:

  1. Add column A
  • zookeeperClient.registerSchema success
  • syncSchemaChangesToLake fails
  1. Add column B
  • zookeeperClient.registerSchema success
  • syncSchemaChangesToLake success
  1. ReAdd column A
  • SchemaUpdate.applySchemaChanges is Idempotent
  • syncSchemaChangesToLake success,

Finally, Fluss Table is : column A, column B
Finally, Lake Table is : column B, column A.

I advice each time before zookeeperClient.registerSchema, check whether the column number is same:

  1. Add column A
  • zookeeperClient.registerSchema success
  • syncSchemaChangesToLake fails
  1. Add column B
  • Columns number is not same, just refuse it.

@wuchong , WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ZK success but Lake sync fails
This is already handled by the idempotent design:

  • [SchemaUpdate.addColumn()] - treats existing column with same type/comment as no-op
  • [PaimonLakeCatalog.alterTable()] - catches ColumnAlreadyExistException as success

So when user retries, both Fluss and Paimon will handle it gracefully.

Copy link
Contributor

@loserwang1024 loserwang1024 Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@buvb Yes, If no other add column is happened, it will handle it gracefully. However, as I mention before:

Aassumed that:

  1. Add column A
  • zookeeperClient.registerSchema success
  • syncSchemaChangesToLake fails
  1. Add column B
  • zookeeperClient.registerSchema success
  • syncSchemaChangesToLake success
  1. ReAdd column A
  • SchemaUpdate.applySchemaChanges is Idempotent
  • syncSchemaChangesToLake success.

Finally, Fluss Table is : column A, column B
Finally, Lake Table is : column B, column A.

I have been talked with jark. He advice to add column in lakehouse before fluss. Maybe each time before apply schema change to paimon, we can get the paimon rowType, and check whether is safe.

RowType rowType = paimonCatalog.getTable(new Identifier(tablePath.getDatabaseName(), tablePath.getTableName())).rowType();

@loserwang1024
Copy link
Contributor

@luoyuxia @wuchong , Would you like to help a final Review?

@buvb
Copy link
Contributor Author

buvb commented Dec 23, 2025

@luoyuxia @wuchong The main changes are as follows

  • Implemented lake-first schema sync, idempotent AddColumn handling, Paimon addColumn mapping, and tiering safeguards (fail when Fluss is wider, pad NULL when Paimon is wider), plus IT/unit coverage.
  • Recent fixes: PaimonTieringITCase now writes values for the new column after ADD COLUMN and asserts actual column names; offset check uses a dynamic index to avoid type mismatches after schema changes.
  • Testing: In an environment that allows binding ports, please run ./mvnw -pl fluss-lake/fluss-lake-paimon -DskipTests=false test. In restricted environments, ReCreateSameTableAfterTieringTest fails due to port bind being denied; all other tests pass.

Copy link
Member

@wuchong wuchong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please rebase your branch to trigger a fresh CI run? The base branch is quite outdated, and running against the latest changes will help uncover any potential issues that might otherwise be hidden.

assertThat(flussRecordAsPaimonRow.isNullAt(1)).isTrue();
assertThat(flussRecordAsPaimonRow.getInt(2)).isEqualTo(tableBucket);
assertThat(flussRecordAsPaimonRow.getLong(3)).isEqualTo(logOffset);
assertThat(flussRecordAsPaimonRow.getLong(4)).isEqualTo(timeStamp);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should assert getTimestamp because this is a timestamp type.

} catch (Catalog.ColumnAlreadyExistException | Catalog.ColumnNotExistException e) {
// shouldn't happen before we support schema change
} catch (Catalog.ColumnAlreadyExistException e) {
// Column already exists, treat as idempotent success for retry scenarios.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that we may execute multiple TableChange operations in a single statement (e.g., adding several columns at once), blindly ignoring ColumnAlreadyExistException could silently skip the addition of some columns, leading to an incomplete schema update.

A simpler and safer approach, in my view, is to compare the current Paimon table schema with the expected target schema before performing any ALTER TABLE (like how you did in org.apache.fluss.server.coordinator.MetadataManager#alterTableSchema):

  • If the schemas differ, proceed with the ALTER TABLE and report any errors faithfully to the user (who can then re-execute if needed).
  • If the schemas already match, log a clear message (e.g., “Column(s) already exist—skipping ALTER TABLE”) and skip the operation.

This ensures correctness, avoids silent failures, and provides transparent feedback to the user.

// Update Fluss schema (ZK) after Lake sync succeeds
if (!newSchema.equals(table.getSchema())) {
zookeeperClient.registerSchema(tablePath, newSchema, table.getSchemaId() + 1);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log a clear message (e.g., “Column(s) already exist—skipping ALTER TABLE”) and skip the operation.

if (existingColumn != null) {
// Allow idempotent retries: if column name/type/comment match existing, treat as no-op
if (!existingColumn.getDataType().equals(addColumn.getDataType())
|| !Objects.equals(existingColumn.getComment(), addColumn.getComment())) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
|| !Objects.equals(existingColumn.getComment(), addColumn.getComment())) {
|| !Objects.equals(existingColumn.getComment().orElse(null), addColumn.getComment())) {

The return type of existingColumn.getComment() is Optional<String> which is not comparable with addColumn.getComment()

List<InternalRow> allRows = new ArrayList<>();
allRows.addAll(initialRows);
allRows.addAll(newRows);
checkDataInPaimonAppendOnlyTable(tablePath, allRows, 0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest constructing an expected list of rows that includes all six columns (the new column, original user columns, and relevant system columns), and then asserting that this expected list exactly matches the rows read from Paimon. This will ensure comprehensive validation of both schema evolution and data correctness.

List<String> fieldNames = paimonTable.rowType().getFieldNames();

// Should have: a, b, c3, __bucket, __offset, __timestamp
assertThat(fieldNames).contains("a", "b", "c3");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert exactly all the field names and order.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support schema evolution for data lake enabled table for paimon

3 participants